[AIRFLOW-1809] Update tests to use timezone aware objects

Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9624f5f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9624f5f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9624f5f2

Branch: refs/heads/master
Commit: 9624f5f24e00299c66adfd799d2be59fabd17f03
Parents: dcac3e9
Author: Bolke de Bruin <[email protected]>
Authored: Wed Nov 22 16:09:50 2017 +0100
Committer: Bolke de Bruin <[email protected]>
Committed: Mon Nov 27 15:54:27 2017 +0100

----------------------------------------------------------------------
 airflow/models.py                               |  7 +-
 airflow/utils/dates.py                          | 11 +--
 airflow/utils/timezone.py                       | 18 +++-
 airflow/www/utils.py                            |  5 +-
 airflow/www/views.py                            | 43 ++++-----
 tests/api/client/test_local_client.py           | 31 +------
 tests/contrib/operators/test_fs_operator.py     |  5 +-
 .../operators/test_jira_operator_test.py        |  5 +-
 tests/contrib/operators/test_sftp_operator.py   | 21 ++---
 .../operators/test_spark_submit_operator.py     |  9 +-
 tests/contrib/operators/test_ssh_operator.py    | 12 +--
 tests/contrib/sensors/test_jira_sensor_test.py  |  6 +-
 tests/contrib/sensors/test_redis_sensor.py      |  4 +-
 tests/core.py                                   | 32 +++----
 tests/dags/test_cli_triggered_dags.py           |  4 +-
 tests/executors/dask_executor.py                | 13 ++-
 tests/impersonation.py                          |  2 +-
 tests/jobs.py                                   | 33 +++----
 tests/models.py                                 | 91 +++++++++++---------
 tests/operators/latest_only_operator.py         | 51 +++++------
 tests/operators/operators.py                    |  7 +-
 tests/operators/python_operator.py              | 11 +--
 tests/operators/sensors.py                      |  8 +-
 tests/operators/subdag_operator.py              |  5 +-
 tests/operators/test_virtualenv_operator.py     | 11 +--
 .../deps/test_not_in_retry_period_dep.py        |  3 +-
 .../ti_deps/deps/test_runnable_exec_date_dep.py |  3 +-
 tests/utils/log/test_file_processor_handler.py  | 10 +--
 tests/utils/log/test_s3_task_handler.py         |  2 +-
 tests/utils/test_dates.py                       | 14 +--
 tests/utils/test_log_handlers.py                |  2 +-
 tests/www/api/experimental/test_endpoints.py    |  9 +-
 tests/www/test_views.py                         |  2 +-
 33 files changed, 251 insertions(+), 239 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 33f3636..e93e8a8 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2892,7 +2892,11 @@ class DAG(BaseDag, LoggingMixin):
         # set timezone
         if start_date and start_date.tzinfo:
             self.timezone = start_date.tzinfo
-        elif 'start_date' in self.default_args and 
self.default_args['start_date'].tzinfo:
+        elif 'start_date' in self.default_args:
+            if isinstance(self.default_args['start_date'], six.string_types):
+                self.default_args['start_date'] = (
+                    timezone.parse(self.default_args['start_date'])
+                )
             self.timezone = self.default_args['start_date'].tzinfo
         else:
             self.timezone = settings.TIMEZONE
@@ -3066,7 +3070,6 @@ class DAG(BaseDag, LoggingMixin):
         # in case of @once
         if not following:
             return dttm
-
         if self.previous_schedule(following) != dttm:
             return following
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/airflow/utils/dates.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dates.py b/airflow/utils/dates.py
index 7d0d9d9..dab2b0d 100644
--- a/airflow/utils/dates.py
+++ b/airflow/utils/dates.py
@@ -99,7 +99,7 @@ def date_range(
     return sorted(l)
 
 
-def round_time(dt, delta, start_date=datetime.min):
+def round_time(dt, delta, start_date=timezone.make_aware(datetime.min)):
     """
     Returns the datetime of the form start_date + i * delta
     which is closest to dt for any non-negative integer i.
@@ -232,11 +232,4 @@ def parse_execution_date(execution_date_str):
     """
     Parse execution date string to datetime object.
     """
-    try:
-        # Execution date follows execution date format of scheduled executions,
-        # e.g. '2017-11-02 00:00:00'
-        return datetime.strptime(execution_date_str, '%Y-%m-%d %H:%M:%S')
-    except ValueError:
-        # Execution date follows execution date format of manually triggered 
executions,
-        # e.g. '2017-11-05 16:18:30..989729'
-        return datetime.strptime(execution_date_str, '%Y-%m-%d %H:%M:%S..%f')
+    return timezone.parse(execution_date_str)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/airflow/utils/timezone.py
----------------------------------------------------------------------
diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py
index 5ae7802..e384a14 100644
--- a/airflow/utils/timezone.py
+++ b/airflow/utils/timezone.py
@@ -50,7 +50,13 @@ def utcnow():
     :return:
     """
 
-    return pendulum.utcnow()
+    # pendulum utcnow() is not used as that sets a TimezoneInfo object
+    # instead of a Timezone. This is not pickable and also creates issues
+    # when using replace()
+    d = dt.datetime.utcnow()
+    d = d.replace(tzinfo=utc)
+
+    return d
 
 
 def convert_to_utc(value):
@@ -94,7 +100,7 @@ def make_aware(value, timezone=None):
         return timezone.convert(value)
     else:
         # This may be wrong around DST changes!
-        return value.replace(tzinfo=timezone)
+        return value.astimezone(tz=timezone)
 
 
 def make_naive(value, timezone=None):
@@ -136,3 +142,11 @@ def datetime(*args, **kwargs):
         kwargs['tzinfo'] = TIMEZONE
 
     return dt.datetime(*args, **kwargs)
+
+
+def parse(string):
+    """
+    Parse a time string and return an aware datetime
+    :param string: time string
+    """
+    return pendulum.parse(string, tz=TIMEZONE)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/airflow/www/utils.py
----------------------------------------------------------------------
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index aba85fa..a0833ee 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -21,7 +21,6 @@ from cgi import escape
 from io import BytesIO as IO
 import functools
 import gzip
-import iso8601
 import json
 import time
 
@@ -34,6 +33,7 @@ from wtforms.compat import text_type
 
 from airflow import configuration, models, settings
 from airflow.utils.db import create_session
+from airflow.utils import timezone
 from airflow.utils.json import AirflowJsonEncoder
 
 AUTHENTICATE = configuration.getboolean('webserver', 'AUTHENTICATE')
@@ -255,8 +255,7 @@ def action_logging(f):
             dag_id=request.args.get('dag_id'))
 
         if 'execution_date' in request.args:
-            log.execution_date = iso8601.parse_date(
-                request.args.get('execution_date'), settings.TIMEZONE)
+            log.execution_date = 
timezone.parse(request.args.get('execution_date'))
 
         with create_session() as session:
             session.add(log)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 550a7f8..5ecee42 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -22,11 +22,11 @@ import pkg_resources
 import socket
 from functools import wraps
 from datetime import timedelta
-import dateutil.parser
 import copy
 import math
 import json
 import bleach
+import pendulum
 from collections import defaultdict
 
 import inspect
@@ -78,6 +78,7 @@ from airflow.utils.state import State
 from airflow.utils.db import create_session, provide_session
 from airflow.utils.helpers import alchemy_to_dict
 from airflow.utils.dates import infer_time_unit, scale_time_units, 
parse_execution_date
+from airflow.utils.timezone import datetime
 from airflow.www import utils as wwwutils
 from airflow.www.forms import DateTimeForm, DateTimeWithNumRunsForm
 from airflow.www.validators import GreaterEqualThan
@@ -669,7 +670,7 @@ class Airflow(BaseView):
         dag_id = request.args.get('dag_id')
         task_id = request.args.get('task_id')
         execution_date = request.args.get('execution_date')
-        dttm = dateutil.parser.parse(execution_date)
+        dttm = pendulum.parse(execution_date)
         form = DateTimeForm(data={'execution_date': dttm})
         dag = dagbag.get_dag(dag_id)
         task = copy.copy(dag.get_task(task_id))
@@ -705,7 +706,7 @@ class Airflow(BaseView):
         dag_id = request.args.get('dag_id')
         task_id = request.args.get('task_id')
         execution_date = request.args.get('execution_date')
-        dttm = dateutil.parser.parse(execution_date)
+        dttm = pendulum.parse(execution_date)
         form = DateTimeForm(data={'execution_date': dttm})
         dag = dagbag.get_dag(dag_id)
         ti = session.query(models.TaskInstance).filter(
@@ -746,7 +747,7 @@ class Airflow(BaseView):
         # Carrying execution_date through, even though it's irrelevant for
         # this context
         execution_date = request.args.get('execution_date')
-        dttm = dateutil.parser.parse(execution_date)
+        dttm = pendulum.parse(execution_date)
         form = DateTimeForm(data={'execution_date': dttm})
         dag = dagbag.get_dag(dag_id)
 
@@ -823,7 +824,7 @@ class Airflow(BaseView):
         # Carrying execution_date through, even though it's irrelevant for
         # this context
         execution_date = request.args.get('execution_date')
-        dttm = dateutil.parser.parse(execution_date)
+        dttm = pendulum.parse(execution_date)
         form = DateTimeForm(data={'execution_date': dttm})
         dag = dagbag.get_dag(dag_id)
         if not dag or task_id not in dag.task_ids:
@@ -863,7 +864,7 @@ class Airflow(BaseView):
         task = dag.get_task(task_id)
 
         execution_date = request.args.get('execution_date')
-        execution_date = dateutil.parser.parse(execution_date)
+        execution_date = pendulum.parse(execution_date)
         ignore_all_deps = request.args.get('ignore_all_deps') == "true"
         ignore_task_deps = request.args.get('ignore_task_deps') == "true"
         ignore_ti_state = request.args.get('ignore_ti_state') == "true"
@@ -987,7 +988,7 @@ class Airflow(BaseView):
         dag = dagbag.get_dag(dag_id)
 
         execution_date = request.args.get('execution_date')
-        execution_date = dateutil.parser.parse(execution_date)
+        execution_date = pendulum.parse(execution_date)
         confirmed = request.args.get('confirmed') == "true"
         upstream = request.args.get('upstream') == "true"
         downstream = request.args.get('downstream') == "true"
@@ -1018,7 +1019,7 @@ class Airflow(BaseView):
         confirmed = request.args.get('confirmed') == "true"
 
         dag = dagbag.get_dag(dag_id)
-        execution_date = dateutil.parser.parse(execution_date)
+        execution_date = pendulum.parse(execution_date)
         start_date = execution_date
         end_date = execution_date
 
@@ -1062,7 +1063,7 @@ class Airflow(BaseView):
             flash('Invalid execution date', 'error')
             return redirect(origin)
 
-        execution_date = dateutil.parser.parse(execution_date)
+        execution_date = pendulum.parse(execution_date)
         dag = dagbag.get_dag(dag_id)
 
         if not dag:
@@ -1099,7 +1100,7 @@ class Airflow(BaseView):
         task.dag = dag
 
         execution_date = request.args.get('execution_date')
-        execution_date = dateutil.parser.parse(execution_date)
+        execution_date = pendulum.parse(execution_date)
         confirmed = request.args.get('confirmed') == "true"
         upstream = request.args.get('upstream') == "true"
         downstream = request.args.get('downstream') == "true"
@@ -1160,7 +1161,7 @@ class Airflow(BaseView):
         num_runs = int(num_runs) if num_runs else 25
 
         if base_date:
-            base_date = dateutil.parser.parse(base_date)
+            base_date = pendulum.parse(base_date)
         else:
             base_date = dag.latest_execution_date or timezone.utcnow()
 
@@ -1218,7 +1219,7 @@ class Airflow(BaseView):
             def set_duration(tid):
                 if (isinstance(tid, dict) and tid.get("state") == 
State.RUNNING and
                             tid["start_date"] is not None):
-                    d = timezone.utcnow() - 
dateutil.parser.parse(tid["start_date"])
+                    d = timezone.utcnow() - pendulum.parse(tid["start_date"])
                     tid["duration"] = d.total_seconds()
                 return tid
 
@@ -1313,9 +1314,9 @@ class Airflow(BaseView):
 
         dttm = request.args.get('execution_date')
         if dttm:
-            dttm = dateutil.parser.parse(dttm)
+            dttm = pendulum.parse(dttm)
         else:
-            dttm = dag.latest_execution_date or timezone.utcnow().date()
+            dttm = dag.latest_execution_date or timezone.utcnow()
 
         DR = models.DagRun
         drs = (
@@ -1389,7 +1390,7 @@ class Airflow(BaseView):
         num_runs = int(num_runs) if num_runs else 25
 
         if base_date:
-            base_date = dateutil.parser.parse(base_date)
+            base_date = pendulum.parse(base_date)
         else:
             base_date = dag.latest_execution_date or timezone.utcnow()
 
@@ -1496,7 +1497,7 @@ class Airflow(BaseView):
         num_runs = int(num_runs) if num_runs else 25
 
         if base_date:
-            base_date = dateutil.parser.parse(base_date)
+            base_date = pendulum.parse(base_date)
         else:
             base_date = dag.latest_execution_date or timezone.utcnow()
 
@@ -1559,7 +1560,7 @@ class Airflow(BaseView):
         num_runs = int(num_runs) if num_runs else 25
 
         if base_date:
-            base_date = dateutil.parser.parse(base_date)
+            base_date = pendulum.parse(base_date)
         else:
             base_date = dag.latest_execution_date or timezone.utcnow()
 
@@ -1686,9 +1687,9 @@ class Airflow(BaseView):
 
         dttm = request.args.get('execution_date')
         if dttm:
-            dttm = dateutil.parser.parse(dttm)
+            dttm = pendulum.parse(dttm)
         else:
-            dttm = dag.latest_execution_date or timezone.utcnow().date()
+            dttm = dag.latest_execution_date or timezone.utcnow()
 
         form = DateTimeForm(data={'execution_date': dttm})
 
@@ -1741,7 +1742,7 @@ class Airflow(BaseView):
 
         dttm = request.args.get('execution_date')
         if dttm:
-            dttm = dateutil.parser.parse(dttm)
+            dttm = pendulum.parse(dttm)
         else:
             return ("Error: Invalid execution_date")
 
@@ -2586,7 +2587,7 @@ class TaskInstanceModelView(ModelViewOnly):
         https://github.com/flask-admin/flask-admin/issues/1226
         """
         task_id, dag_id, execution_date = iterdecode(id)
-        execution_date = dateutil.parser.parse(execution_date)
+        execution_date = pendulum.parse(execution_date)
         return self.session.query(self.model).get((task_id, dag_id, 
execution_date))
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/api/client/test_local_client.py
----------------------------------------------------------------------
diff --git a/tests/api/client/test_local_client.py 
b/tests/api/client/test_local_client.py
index 7a759fe..31a1712 100644
--- a/tests/api/client/test_local_client.py
+++ b/tests/api/client/test_local_client.py
@@ -12,45 +12,23 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import datetime
 import json
 import unittest
 
+from freezegun import freeze_time
 from mock import patch
 
 from airflow import AirflowException
 from airflow.api.client.local_client import Client
 from airflow import models
 from airflow import settings
+from airflow.utils import timezone
 from airflow.utils.state import State
 
-EXECDATE = datetime.datetime.now()
+EXECDATE = timezone.utcnow()
 EXECDATE_NOFRACTIONS = EXECDATE.replace(microsecond=0)
 EXECDATE_ISO = EXECDATE_NOFRACTIONS.isoformat()
 
-real_datetime_class = datetime.datetime
-
-
-def mock_datetime_now(target, dt):
-    class DatetimeSubclassMeta(type):
-        @classmethod
-        def __instancecheck__(mcs, obj):
-            return isinstance(obj, real_datetime_class)
-
-    class BaseMockedDatetime(real_datetime_class):
-        @classmethod
-        def now(cls, tz=None):
-            return target.replace(tzinfo=tz)
-
-        @classmethod
-        def utcnow(cls):
-            return target
-
-    # Python2 & Python3 compatible metaclass
-    MockedDatetime = DatetimeSubclassMeta('datetime', (BaseMockedDatetime,), 
{})
-
-    return patch.object(dt, 'datetime', MockedDatetime)
-
 
 class TestLocalClient(unittest.TestCase):
 
@@ -81,8 +59,7 @@ class TestLocalClient(unittest.TestCase):
         with self.assertRaises(AirflowException):
             client.trigger_dag(dag_id="blablabla")
 
-        import airflow.api.common.experimental.trigger_dag
-        with mock_datetime_now(EXECDATE, 
airflow.api.common.experimental.trigger_dag.datetime):
+        with freeze_time(EXECDATE):
             # no execution date, execution date should be set automatically
             client.trigger_dag(dag_id="test_start_date_scheduling")
             
mock.assert_called_once_with(run_id="manual__{0}".format(EXECDATE_ISO),

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/contrib/operators/test_fs_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_fs_operator.py 
b/tests/contrib/operators/test_fs_operator.py
index f990157..2ef4286 100644
--- a/tests/contrib/operators/test_fs_operator.py
+++ b/tests/contrib/operators/test_fs_operator.py
@@ -14,12 +14,12 @@
 #
 
 import unittest
-from datetime import datetime
 
 from airflow import configuration
 from airflow.settings import Session
 from airflow import models, DAG
 from airflow.contrib.operators.fs_operator import FileSensor
+from airflow.utils.timezone import datetime
 
 TEST_DAG_ID = 'unit_tests'
 DEFAULT_DATE = datetime(2015, 1, 1)
@@ -33,8 +33,10 @@ def reset(dag_id=TEST_DAG_ID):
     session.commit()
     session.close()
 
+
 reset()
 
+
 class FileSensorTest(unittest.TestCase):
     def setUp(self):
         configuration.load_test_config()
@@ -60,5 +62,6 @@ class FileSensorTest(unittest.TestCase):
         )
         task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
 
+
 if __name__ == '__main__':
     unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/contrib/operators/test_jira_operator_test.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_jira_operator_test.py 
b/tests/contrib/operators/test_jira_operator_test.py
index 6d615df..566cca4 100644
--- a/tests/contrib/operators/test_jira_operator_test.py
+++ b/tests/contrib/operators/test_jira_operator_test.py
@@ -14,7 +14,7 @@
 #
 
 import unittest
-import datetime
+
 from mock import Mock
 from mock import patch
 
@@ -22,8 +22,9 @@ from airflow import DAG, configuration
 from airflow.contrib.operators.jira_operator import JiraOperator
 from airflow import models
 from airflow.utils import db
+from airflow.utils import timezone
 
-DEFAULT_DATE = datetime.datetime(2017, 1, 1)
+DEFAULT_DATE = timezone.datetime(2017, 1, 1)
 jira_client_mock = Mock(
         name="jira_client_for_test"
 )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/contrib/operators/test_sftp_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_sftp_operator.py 
b/tests/contrib/operators/test_sftp_operator.py
index 39e8d88..81e0c9e 100644
--- a/tests/contrib/operators/test_sftp_operator.py
+++ b/tests/contrib/operators/test_sftp_operator.py
@@ -15,7 +15,6 @@
 import os
 import unittest
 from base64 import b64encode
-from datetime import datetime
 
 from airflow import configuration
 from airflow import models
@@ -23,6 +22,8 @@ from airflow.contrib.operators.sftp_operator import 
SFTPOperator, SFTPOperation
 from airflow.contrib.operators.ssh_operator import SSHOperator
 from airflow.models import DAG, TaskInstance
 from airflow.settings import Session
+from airflow.utils import timezone
+from airflow.utils.timezone import datetime
 
 TEST_DAG_ID = 'unit_tests'
 DEFAULT_DATE = datetime(2017, 1, 1)
@@ -80,7 +81,7 @@ class SFTPOperatorTest(unittest.TestCase):
                 dag=self.dag
         )
         self.assertIsNotNone(put_test_task)
-        ti2 = TaskInstance(task=put_test_task, execution_date=datetime.now())
+        ti2 = TaskInstance(task=put_test_task, 
execution_date=timezone.utcnow())
         ti2.run()
 
         # check the remote file content
@@ -92,7 +93,7 @@ class SFTPOperatorTest(unittest.TestCase):
                 dag=self.dag
         )
         self.assertIsNotNone(check_file_task)
-        ti3 = TaskInstance(task=check_file_task, execution_date=datetime.now())
+        ti3 = TaskInstance(task=check_file_task, 
execution_date=timezone.utcnow())
         ti3.run()
         self.assertEqual(
                 ti3.xcom_pull(task_ids='test_check_file', 
key='return_value').strip(),
@@ -117,7 +118,7 @@ class SFTPOperatorTest(unittest.TestCase):
                 dag=self.dag
         )
         self.assertIsNotNone(put_test_task)
-        ti2 = TaskInstance(task=put_test_task, execution_date=datetime.now())
+        ti2 = TaskInstance(task=put_test_task, 
execution_date=timezone.utcnow())
         ti2.run()
 
         # check the remote file content
@@ -129,7 +130,7 @@ class SFTPOperatorTest(unittest.TestCase):
                 dag=self.dag
         )
         self.assertIsNotNone(check_file_task)
-        ti3 = TaskInstance(task=check_file_task, execution_date=datetime.now())
+        ti3 = TaskInstance(task=check_file_task, 
execution_date=timezone.utcnow())
         ti3.run()
         self.assertEqual(
                 ti3.xcom_pull(task_ids='test_check_file', 
key='return_value').strip(),
@@ -152,7 +153,7 @@ class SFTPOperatorTest(unittest.TestCase):
                 dag=self.dag
         )
         self.assertIsNotNone(create_file_task)
-        ti1 = TaskInstance(task=create_file_task, 
execution_date=datetime.now())
+        ti1 = TaskInstance(task=create_file_task, 
execution_date=timezone.utcnow())
         ti1.run()
 
         # get remote file to local
@@ -165,7 +166,7 @@ class SFTPOperatorTest(unittest.TestCase):
                 dag=self.dag
         )
         self.assertIsNotNone(get_test_task)
-        ti2 = TaskInstance(task=get_test_task, execution_date=datetime.now())
+        ti2 = TaskInstance(task=get_test_task, 
execution_date=timezone.utcnow())
         ti2.run()
 
         # test the received content
@@ -190,7 +191,7 @@ class SFTPOperatorTest(unittest.TestCase):
                 dag=self.dag
         )
         self.assertIsNotNone(create_file_task)
-        ti1 = TaskInstance(task=create_file_task, 
execution_date=datetime.now())
+        ti1 = TaskInstance(task=create_file_task, 
execution_date=timezone.utcnow())
         ti1.run()
 
         # get remote file to local
@@ -203,7 +204,7 @@ class SFTPOperatorTest(unittest.TestCase):
                 dag=self.dag
         )
         self.assertIsNotNone(get_test_task)
-        ti2 = TaskInstance(task=get_test_task, execution_date=datetime.now())
+        ti2 = TaskInstance(task=get_test_task, 
execution_date=timezone.utcnow())
         ti2.run()
 
         # test the received content
@@ -227,7 +228,7 @@ class SFTPOperatorTest(unittest.TestCase):
                 dag=self.dag
         )
         self.assertIsNotNone(remove_file_task)
-        ti3 = TaskInstance(task=remove_file_task, 
execution_date=datetime.now())
+        ti3 = TaskInstance(task=remove_file_task, 
execution_date=timezone.utcnow())
         ti3.run()
 
     def tearDown(self):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/contrib/operators/test_spark_submit_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_spark_submit_operator.py 
b/tests/contrib/operators/test_spark_submit_operator.py
index 0731da9..4e72eea 100644
--- a/tests/contrib/operators/test_spark_submit_operator.py
+++ b/tests/contrib/operators/test_spark_submit_operator.py
@@ -14,15 +14,17 @@
 #
 
 import unittest
-import datetime
 import sys
 
 from airflow import DAG, configuration
 from airflow.models import TaskInstance
 
 from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
+from airflow.utils import timezone
 
-DEFAULT_DATE = datetime.datetime(2017, 1, 1)
+from datetime import timedelta
+
+DEFAULT_DATE = timezone.datetime(2017, 1, 1)
 
 
 class TestSparkSubmitOperator(unittest.TestCase):
@@ -146,7 +148,7 @@ class TestSparkSubmitOperator(unittest.TestCase):
         # Then
         expected_application_args = [u'-f', 'foo',
                                      u'--bar', 'bar',
-                                     u'--start', (DEFAULT_DATE - 
datetime.timedelta(days=1)).strftime("%Y-%m-%d"),
+                                     u'--start', (DEFAULT_DATE - 
timedelta(days=1)).strftime("%Y-%m-%d"),
                                      u'--end', 
DEFAULT_DATE.strftime("%Y-%m-%d"),
                                      u'--with-spaces', u'args should keep 
embdedded spaces',
                                      ]
@@ -154,5 +156,6 @@ class TestSparkSubmitOperator(unittest.TestCase):
         self.assertListEqual(expected_application_args, getattr(operator, 
'_application_args'))
         self.assertEqual(expected_name, getattr(operator, '_name'))
 
+
 if __name__ == '__main__':
     unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/contrib/operators/test_ssh_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_ssh_operator.py 
b/tests/contrib/operators/test_ssh_operator.py
index 019dfe4..4cec913 100644
--- a/tests/contrib/operators/test_ssh_operator.py
+++ b/tests/contrib/operators/test_ssh_operator.py
@@ -14,13 +14,14 @@
 
 import unittest
 from base64 import b64encode
-from datetime import datetime
 
 from airflow import configuration
 from airflow import models
 from airflow.contrib.operators.ssh_operator import SSHOperator
 from airflow.models import DAG, TaskInstance
 from airflow.settings import Session
+from airflow.utils import timezone
+from airflow.utils.timezone import datetime
 
 TEST_DAG_ID = 'unit_tests'
 DEFAULT_DATE = datetime(2017, 1, 1)
@@ -65,7 +66,7 @@ class SSHOperatorTest(unittest.TestCase):
         self.assertIsNotNone(task)
 
         ti = TaskInstance(
-                task=task, execution_date=datetime.now())
+                task=task, execution_date=timezone.utcnow())
         ti.run()
         self.assertIsNotNone(ti.duration)
         self.assertEqual(ti.xcom_pull(task_ids='test', key='return_value'),
@@ -84,7 +85,7 @@ class SSHOperatorTest(unittest.TestCase):
         self.assertIsNotNone(task)
 
         ti = TaskInstance(
-                task=task, execution_date=datetime.now())
+                task=task, execution_date=timezone.utcnow())
         ti.run()
         self.assertIsNotNone(ti.duration)
         self.assertEqual(ti.xcom_pull(task_ids='test', key='return_value'), 
b'airflow')
@@ -102,7 +103,7 @@ class SSHOperatorTest(unittest.TestCase):
         self.assertIsNotNone(task)
 
         ti = TaskInstance(
-            task=task, execution_date=datetime.now())
+            task=task, execution_date=timezone.utcnow())
         ti.run()
         self.assertIsNotNone(ti.duration)
         self.assertEqual(ti.xcom_pull(task_ids='test', key='return_value'), 
b'airflow')
@@ -120,10 +121,11 @@ class SSHOperatorTest(unittest.TestCase):
         self.assertIsNotNone(task)
 
         ti = TaskInstance(
-            task=task, execution_date=datetime.now())
+            task=task, execution_date=timezone.utcnow())
         ti.run()
         self.assertIsNotNone(ti.duration)
         self.assertEqual(ti.xcom_pull(task_ids='test', key='return_value'), 
b'')
 
+
 if __name__ == '__main__':
     unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/contrib/sensors/test_jira_sensor_test.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_jira_sensor_test.py 
b/tests/contrib/sensors/test_jira_sensor_test.py
index 77ca97f..7c16188 100644
--- a/tests/contrib/sensors/test_jira_sensor_test.py
+++ b/tests/contrib/sensors/test_jira_sensor_test.py
@@ -14,16 +14,16 @@
 #
 
 import unittest
-import datetime
+
 from mock import Mock
 from mock import patch
 
 from airflow import DAG, configuration
 from airflow.contrib.sensors.jira_sensor import JiraTicketSensor
 from airflow import models
-from airflow.utils import db
+from airflow.utils import db, timezone
 
-DEFAULT_DATE = datetime.datetime(2017, 1, 1)
+DEFAULT_DATE = timezone.datetime(2017, 1, 1)
 jira_client_mock = Mock(
         name="jira_client_for_test"
 )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/contrib/sensors/test_redis_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_redis_sensor.py 
b/tests/contrib/sensors/test_redis_sensor.py
index 8022a92..d627501 100644
--- a/tests/contrib/sensors/test_redis_sensor.py
+++ b/tests/contrib/sensors/test_redis_sensor.py
@@ -14,15 +14,15 @@
 
 
 import unittest
-import datetime
 
 from mock import patch
 
 from airflow import DAG
 from airflow import configuration
 from airflow.contrib.sensors.redis_key_sensor import RedisKeySensor
+from airflow.utils import timezone
 
-DEFAULT_DATE = datetime.datetime(2017, 1, 1)
+DEFAULT_DATE = timezone.datetime(2017, 1, 1)
 
 
 class TestRedisSensor(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 0bd0c87..a57f0ed 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -25,9 +25,10 @@ import multiprocessing
 import mock
 from numpy.testing import assert_array_almost_equal
 import tempfile
-from datetime import datetime, time, timedelta
+from datetime import time, timedelta
 from email.mime.multipart import MIMEMultipart
 from email.mime.application import MIMEApplication
+from freezegun import freeze_time
 import signal
 from six.moves.urllib.parse import urlencode
 from time import sleep
@@ -39,7 +40,6 @@ import sqlalchemy
 from airflow import configuration
 from airflow.executors import SequentialExecutor
 from airflow.models import Variable
-from tests.test_utils.fake_datetime import FakeDatetime
 
 configuration.load_test_config()
 from airflow import jobs, models, DAG, utils, macros, settings, exceptions
@@ -56,6 +56,8 @@ from airflow.hooks.sqlite_hook import SqliteHook
 from airflow.bin import cli
 from airflow.www import app as application
 from airflow.settings import Session
+from airflow.utils import timezone
+from airflow.utils.timezone import datetime
 from airflow.utils.state import State
 from airflow.utils.dates import infer_time_unit, round_time, scale_time_units
 from lxml import html
@@ -208,7 +210,7 @@ class CoreTest(unittest.TestCase):
             owner='Also fake',
             start_date=datetime(2015, 1, 2, 0, 0)))
 
-        start_date = datetime.utcnow()
+        start_date = timezone.utcnow()
 
         run = dag.create_dagrun(
             run_id='test_' + start_date.isoformat(),
@@ -254,7 +256,7 @@ class CoreTest(unittest.TestCase):
 
         self.assertIsNone(additional_dag_run)
 
-    @mock.patch('airflow.jobs.datetime', FakeDatetime)
+    @freeze_time('2016-01-01')
     def test_schedule_dag_no_end_date_up_to_today_only(self):
         """
         Tests that a Dag created without an end_date can only be scheduled up
@@ -264,9 +266,6 @@ class CoreTest(unittest.TestCase):
         start_date of 2015-01-01, only jobs up to, but not including
         2016-01-01 should be scheduled.
         """
-        from datetime import datetime
-        FakeDatetime.utcnow = classmethod(lambda cls: datetime(2016, 1, 1))
-
         session = settings.Session()
         delta = timedelta(days=1)
         start_date = DEFAULT_DATE
@@ -332,7 +331,8 @@ class CoreTest(unittest.TestCase):
         self.assertNotEqual(dag_subclass, self.dag)
 
         # a dag should equal an unpickled version of itself
-        self.assertEqual(pickle.loads(pickle.dumps(self.dag)), self.dag)
+        d = pickle.dumps(self.dag)
+        self.assertEqual(pickle.loads(d), self.dag)
 
         # dags are ordered based on dag_id no matter what the type is
         self.assertLess(self.dag, dag_diff_name)
@@ -1637,7 +1637,7 @@ class SecurityTests(unittest.TestCase):
 
     def tearDown(self):
         configuration.conf.set("webserver", "expose_config", "False")
-        self.dag_bash.clear(start_date=DEFAULT_DATE, 
end_date=datetime.utcnow())
+        self.dag_bash.clear(start_date=DEFAULT_DATE, 
end_date=timezone.utcnow())
 
 class WebUiTests(unittest.TestCase):
     def setUp(self):
@@ -1657,23 +1657,23 @@ class WebUiTests(unittest.TestCase):
         self.example_xcom = self.dagbag.dags['example_xcom']
 
         self.dagrun_bash2 = self.dag_bash2.create_dagrun(
-            
run_id="test_{}".format(models.DagRun.id_for_date(datetime.utcnow())),
+            
run_id="test_{}".format(models.DagRun.id_for_date(timezone.utcnow())),
             execution_date=DEFAULT_DATE,
-            start_date=datetime.utcnow(),
+            start_date=timezone.utcnow(),
             state=State.RUNNING
         )
 
         self.sub_dag.create_dagrun(
-            
run_id="test_{}".format(models.DagRun.id_for_date(datetime.utcnow())),
+            
run_id="test_{}".format(models.DagRun.id_for_date(timezone.utcnow())),
             execution_date=DEFAULT_DATE,
-            start_date=datetime.utcnow(),
+            start_date=timezone.utcnow(),
             state=State.RUNNING
         )
 
         self.example_xcom.create_dagrun(
-            
run_id="test_{}".format(models.DagRun.id_for_date(datetime.utcnow())),
+            
run_id="test_{}".format(models.DagRun.id_for_date(timezone.utcnow())),
             execution_date=DEFAULT_DATE,
-            start_date=datetime.utcnow(),
+            start_date=timezone.utcnow(),
             state=State.RUNNING
         )
 
@@ -1849,7 +1849,7 @@ class WebUiTests(unittest.TestCase):
 
     def tearDown(self):
         configuration.conf.set("webserver", "expose_config", "False")
-        self.dag_bash.clear(start_date=DEFAULT_DATE, 
end_date=datetime.utcnow())
+        self.dag_bash.clear(start_date=DEFAULT_DATE, 
end_date=timezone.utcnow())
         session = Session()
         session.query(models.DagRun).delete()
         session.query(models.TaskInstance).delete()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/dags/test_cli_triggered_dags.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_cli_triggered_dags.py 
b/tests/dags/test_cli_triggered_dags.py
index 5af8fc8..94afe0e 100644
--- a/tests/dags/test_cli_triggered_dags.py
+++ b/tests/dags/test_cli_triggered_dags.py
@@ -13,9 +13,11 @@
 # limitations under the License.
 
 
-from datetime import datetime, timedelta
+from datetime import timedelta
+
 from airflow.models import DAG
 from airflow.operators.python_operator import PythonOperator
+from airflow.utils.timezone import datetime
 
 DEFAULT_DATE = datetime(2016, 1, 1)
 default_args = dict(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/executors/dask_executor.py
----------------------------------------------------------------------
diff --git a/tests/executors/dask_executor.py b/tests/executors/dask_executor.py
index f66a272..decd663 100644
--- a/tests/executors/dask_executor.py
+++ b/tests/executors/dask_executor.py
@@ -12,15 +12,15 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import datetime
 import logging
-import time
 import unittest
 
 from airflow import configuration
 from airflow.models import DAG, DagBag, TaskInstance, State
 from airflow.jobs import BackfillJob
-from airflow.operators.python_operator import PythonOperator
+from airflow.utils import timezone
+
+from datetime import timedelta
 
 try:
     from airflow.executors.dask_executor import DaskExecutor
@@ -34,7 +34,7 @@ if 'sqlite' in configuration.get('core', 'sql_alchemy_conn'):
     logging.error('sqlite does not support concurrent access')
     SKIP_DASK = True
 
-DEFAULT_DATE = datetime.datetime(2017, 1, 1)
+DEFAULT_DATE = timezone.datetime(2017, 1, 1)
 
 
 class DaskExecutorTest(unittest.TestCase):
@@ -63,9 +63,9 @@ class DaskExecutorTest(unittest.TestCase):
             k for k, v in executor.futures.items() if v == 'fail')
 
         # wait for the futures to execute, with a timeout
-        timeout = datetime.datetime.now() + datetime.timedelta(seconds=30)
+        timeout = timezone.utcnow() + timedelta(seconds=30)
         while not (success_future.done() and fail_future.done()):
-            if datetime.datetime.now() > timeout:
+            if timezone.utcnow() > timeout:
                 raise ValueError(
                     'The futures should have finished; there is probably '
                     'an error communciating with the Dask cluster.')
@@ -80,7 +80,6 @@ class DaskExecutorTest(unittest.TestCase):
 
         cluster.close()
 
-
     @unittest.skipIf(SKIP_DASK, 'Dask unsupported by this configuration')
     def test_backfill_integration(self):
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/impersonation.py
----------------------------------------------------------------------
diff --git a/tests/impersonation.py b/tests/impersonation.py
index 0777def..5355c9a 100644
--- a/tests/impersonation.py
+++ b/tests/impersonation.py
@@ -19,7 +19,7 @@ import unittest
 
 from airflow import jobs, models
 from airflow.utils.state import State
-from datetime import datetime
+from airflow.utils.timezone import datetime
 
 DEV_NULL = '/dev/null'
 TEST_DAG_FOLDER = os.path.join(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 119e1b4..ca2db2c 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -37,6 +37,7 @@ from airflow.models import DAG, DagModel, DagBag, DagRun, 
Pool, TaskInstance as
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.bash_operator import BashOperator
 from airflow.task_runner.base_task_runner import BaseTaskRunner
+from airflow.utils import timezone
 from airflow.utils.dates import days_ago
 from airflow.utils.db import provide_session
 from airflow.utils.state import State
@@ -63,7 +64,7 @@ except ImportError:
         mock = None
 
 DEV_NULL = '/dev/null'
-DEFAULT_DATE = datetime.datetime(2016, 1, 1)
+DEFAULT_DATE = timezone.datetime(2016, 1, 1)
 
 # Include the words "airflow" and "dag" in the file contents, tricking airflow 
into thinking these
 # files contain a DAG (otherwise Airflow will skip them)
@@ -659,7 +660,7 @@ class BackfillJobTest(unittest.TestCase):
         subdag = subdag_op_task.subdag
         subdag.schedule_interval = '@daily'
 
-        start_date = datetime.datetime.now()
+        start_date = timezone.utcnow()
         executor = TestExecutor(do_update=True)
         job = BackfillJob(dag=subdag,
                           start_date=start_date,
@@ -1838,7 +1839,7 @@ class SchedulerJobTest(unittest.TestCase):
         """
         dag = DAG(
             'test_scheduler_dagrun_once',
-            start_date=datetime.datetime(2015, 1, 1),
+            start_date=timezone.datetime(2015, 1, 1),
             schedule_interval="@once")
 
         scheduler = SchedulerJob()
@@ -1912,7 +1913,7 @@ class SchedulerJobTest(unittest.TestCase):
     def test_scheduler_do_not_schedule_too_early(self):
         dag = DAG(
             dag_id='test_scheduler_do_not_schedule_too_early',
-            start_date=datetime.datetime(2200, 1, 1))
+            start_date=timezone.datetime(2200, 1, 1))
         dag_task1 = DummyOperator(
             task_id='dummy',
             dag=dag,
@@ -2059,7 +2060,7 @@ class SchedulerJobTest(unittest.TestCase):
 
         dr = scheduler.create_dag_run(dag)
         self.assertIsNotNone(dr)
-        dr.start_date = datetime.datetime.now() - datetime.timedelta(days=1)
+        dr.start_date = timezone.utcnow() - datetime.timedelta(days=1)
         session.merge(dr)
         session.commit()
 
@@ -2102,7 +2103,7 @@ class SchedulerJobTest(unittest.TestCase):
         self.assertIsNone(new_dr)
 
         # Should be scheduled as dagrun_timeout has passed
-        dr.start_date = datetime.datetime.now() - datetime.timedelta(days=1)
+        dr.start_date = timezone.utcnow() - datetime.timedelta(days=1)
         session.merge(dr)
         session.commit()
         new_dr = scheduler.create_dag_run(dag)
@@ -2213,7 +2214,7 @@ class SchedulerJobTest(unittest.TestCase):
         """
         dag = DAG(
             dag_id='test_scheduler_auto_align_1',
-            start_date=datetime.datetime(2016, 1, 1, 10, 10, 0),
+            start_date=timezone.datetime(2016, 1, 1, 10, 10, 0),
             schedule_interval="4 5 * * *"
         )
         dag_task1 = DummyOperator(
@@ -2231,11 +2232,11 @@ class SchedulerJobTest(unittest.TestCase):
 
         dr = scheduler.create_dag_run(dag)
         self.assertIsNotNone(dr)
-        self.assertEquals(dr.execution_date, datetime.datetime(2016, 1, 2, 5, 
4))
+        self.assertEquals(dr.execution_date, timezone.datetime(2016, 1, 2, 5, 
4))
 
         dag = DAG(
             dag_id='test_scheduler_auto_align_2',
-            start_date=datetime.datetime(2016, 1, 1, 10, 10, 0),
+            start_date=timezone.datetime(2016, 1, 1, 10, 10, 0),
             schedule_interval="10 10 * * *"
         )
         dag_task1 = DummyOperator(
@@ -2253,7 +2254,7 @@ class SchedulerJobTest(unittest.TestCase):
 
         dr = scheduler.create_dag_run(dag)
         self.assertIsNotNone(dr)
-        self.assertEquals(dr.execution_date, datetime.datetime(2016, 1, 1, 10, 
10))
+        self.assertEquals(dr.execution_date, timezone.datetime(2016, 1, 1, 10, 
10))
 
     def test_scheduler_reschedule(self):
         """
@@ -2458,12 +2459,12 @@ class SchedulerJobTest(unittest.TestCase):
         self.assertTrue(dag.start_date > DEFAULT_DATE)
 
         expected_run_duration = 5
-        start_time = datetime.datetime.now()
+        start_time = timezone.utcnow()
         scheduler = SchedulerJob(dag_id,
                                  run_duration=expected_run_duration,
                                  **self.default_scheduler_args)
         scheduler.run()
-        end_time = datetime.datetime.now()
+        end_time = timezone.utcnow()
 
         run_duration = (end_time - start_time).total_seconds()
         logging.info("Test ran in %.2fs, expected %.2fs",
@@ -2503,7 +2504,7 @@ class SchedulerJobTest(unittest.TestCase):
         Test to check that a DAG returns it's active runs
         """
 
-        now = datetime.datetime.now()
+        now = timezone.utcnow()
         six_hours_ago_to_the_hour = (now - 
datetime.timedelta(hours=6)).replace(minute=0, second=0, microsecond=0)
 
         START_DATE = six_hours_ago_to_the_hour
@@ -2557,7 +2558,7 @@ class SchedulerJobTest(unittest.TestCase):
         Test to check that a DAG with catchup = False only schedules beginning 
now, not back to the start date
         """
 
-        now = datetime.datetime.now()
+        now = timezone.utcnow()
         six_hours_ago_to_the_hour = (now - 
datetime.timedelta(hours=6)).replace(minute=0, second=0, microsecond=0)
         three_minutes_ago = now - datetime.timedelta(minutes=3)
         two_hours_and_three_minutes_ago = three_minutes_ago - 
datetime.timedelta(hours=2)
@@ -2618,7 +2619,7 @@ class SchedulerJobTest(unittest.TestCase):
         self.assertGreater(dr.execution_date, three_minutes_ago)
 
         # The DR should be scheduled BEFORE now
-        self.assertLess(dr.execution_date, datetime.datetime.now())
+        self.assertLess(dr.execution_date, timezone.utcnow())
 
         dag3 = DAG(DAG_NAME3,
                    schedule_interval='@hourly',
@@ -2652,7 +2653,7 @@ class SchedulerJobTest(unittest.TestCase):
         self.assertGreater(dr.execution_date, two_hours_and_three_minutes_ago)
 
         # The DR should be scheduled BEFORE now
-        self.assertLess(dr.execution_date, datetime.datetime.now())
+        self.assertLess(dr.execution_date, timezone.utcnow())
 
     def 
test_add_unparseable_file_before_sched_start_creates_import_error(self):
         try:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index a1de17d..cabcf3a 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -20,6 +20,7 @@ from __future__ import unicode_literals
 import datetime
 import logging
 import os
+import pendulum
 import unittest
 import time
 
@@ -36,13 +37,14 @@ from airflow.operators.bash_operator import BashOperator
 from airflow.operators.python_operator import PythonOperator
 from airflow.operators.python_operator import ShortCircuitOperator
 from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
+from airflow.utils import timezone
 from airflow.utils.state import State
 from airflow.utils.trigger_rule import TriggerRule
 from mock import patch
 from parameterized import parameterized
 
 
-DEFAULT_DATE = datetime.datetime(2016, 1, 1)
+DEFAULT_DATE = timezone.datetime(2016, 1, 1)
 TEST_DAGS_FOLDER = os.path.join(
     os.path.dirname(os.path.realpath(__file__)), 'dags')
 
@@ -320,7 +322,7 @@ class DagStatTest(unittest.TestCase):
         with dag:
             op1 = DummyOperator(task_id='A')
 
-        now = datetime.datetime.now()
+        now = timezone.utcnow()
         dr = dag.create_dagrun(
             run_id='manual__' + now.isoformat(),
             execution_date=now,
@@ -345,7 +347,7 @@ class DagStatTest(unittest.TestCase):
 class DagRunTest(unittest.TestCase):
 
     def create_dag_run(self, dag, state=State.RUNNING, task_states=None, 
execution_date=None):
-        now = datetime.datetime.now()
+        now = timezone.utcnow()
         if execution_date is None:
             execution_date = now
         dag_run = dag.create_dagrun(
@@ -367,14 +369,14 @@ class DagRunTest(unittest.TestCase):
 
     def test_id_for_date(self):
         run_id = models.DagRun.id_for_date(
-            datetime.datetime(2015, 1, 2, 3, 4, 5, 6, None))
+            timezone.datetime(2015, 1, 2, 3, 4, 5, 6))
         self.assertEqual(
             'scheduled__2015-01-02T03:04:05', run_id,
             'Generated run_id did not match expectations: {0}'.format(run_id))
 
     def test_dagrun_find(self):
         session = settings.Session()
-        now = datetime.datetime.now()
+        now = timezone.utcnow()
 
         dag_id1 = "test_dagrun_find_externally_triggered"
         dag_run = models.DagRun(
@@ -411,7 +413,7 @@ class DagRunTest(unittest.TestCase):
         """
         dag = DAG(
             dag_id='test_dagrun_success_when_all_skipped',
-            start_date=datetime.datetime(2017, 1, 1)
+            start_date=timezone.datetime(2017, 1, 1)
         )
         dag_task1 = ShortCircuitOperator(
             task_id='test_short_circuit_false',
@@ -459,7 +461,7 @@ class DagRunTest(unittest.TestCase):
 
         dag.clear()
 
-        now = datetime.datetime.now()
+        now = timezone.utcnow()
         dr = dag.create_dagrun(run_id='test_dagrun_success_conditions',
                                state=State.RUNNING,
                                execution_date=now,
@@ -498,7 +500,7 @@ class DagRunTest(unittest.TestCase):
             op2.set_upstream(op1)
 
         dag.clear()
-        now = datetime.datetime.now()
+        now = timezone.utcnow()
         dr = dag.create_dagrun(run_id='test_dagrun_deadlock',
                                state=State.RUNNING,
                                execution_date=now,
@@ -556,7 +558,7 @@ class DagRunTest(unittest.TestCase):
         """
         dag = DAG(
             dag_id='test_get_task_instance_on_empty_dagrun',
-            start_date=datetime.datetime(2017, 1, 1)
+            start_date=timezone.datetime(2017, 1, 1)
         )
         dag_task1 = ShortCircuitOperator(
             task_id='test_short_circuit_false',
@@ -565,7 +567,7 @@ class DagRunTest(unittest.TestCase):
 
         session = settings.Session()
 
-        now = datetime.datetime.now()
+        now = timezone.utcnow()
 
         # Don't use create_dagrun since it will create the task instances too 
which we
         # don't want
@@ -589,14 +591,14 @@ class DagRunTest(unittest.TestCase):
             dag_id='test_latest_runs_1',
             start_date=DEFAULT_DATE)
         dag_1_run_1 = self.create_dag_run(dag,
-                execution_date=datetime.datetime(2015, 1, 1))
+                execution_date=timezone.datetime(2015, 1, 1))
         dag_1_run_2 = self.create_dag_run(dag,
-                execution_date=datetime.datetime(2015, 1, 2))
+                execution_date=timezone.datetime(2015, 1, 2))
         dagruns = models.DagRun.get_latest_runs(session)
         session.close()
         for dagrun in dagruns:
             if dagrun.dag_id == 'test_latest_runs_1':
-                self.assertEqual(dagrun.execution_date, 
datetime.datetime(2015, 1, 2))
+                self.assertEqual(dagrun.execution_date, 
timezone.datetime(2015, 1, 2))
 
     def test_is_backfill(self):
         dag = DAG(dag_id='test_is_backfill', start_date=DEFAULT_DATE)
@@ -835,7 +837,7 @@ class TaskInstanceTest(unittest.TestCase):
                   max_active_runs=1, concurrency=2)
         task = DummyOperator(task_id='test_requeue_over_concurrency_op', 
dag=dag)
 
-        ti = TI(task=task, execution_date=datetime.datetime.now())
+        ti = TI(task=task, execution_date=timezone.utcnow())
         ti.run()
         self.assertEqual(ti.state, models.State.NONE)
 
@@ -852,9 +854,9 @@ class TaskInstanceTest(unittest.TestCase):
         dag = models.DAG(dag_id='test_run_pooling_task')
         task = DummyOperator(task_id='test_run_pooling_task_op', dag=dag,
                              pool='test_run_pooling_task_pool', 
owner='airflow',
-                             start_date=datetime.datetime(2016, 2, 1, 0, 0, 0))
+                             start_date=timezone.datetime(2016, 2, 1, 0, 0, 0))
         ti = TI(
-            task=task, execution_date=datetime.datetime.now())
+            task=task, execution_date=timezone.utcnow())
         ti.run()
         self.assertEqual(ti.state, models.State.SUCCESS)
 
@@ -873,9 +875,9 @@ class TaskInstanceTest(unittest.TestCase):
             dag=dag,
             pool='test_run_pooling_task_with_mark_success_pool',
             owner='airflow',
-            start_date=datetime.datetime(2016, 2, 1, 0, 0, 0))
+            start_date=timezone.datetime(2016, 2, 1, 0, 0, 0))
         ti = TI(
-            task=task, execution_date=datetime.datetime.now())
+            task=task, execution_date=timezone.utcnow())
         ti.run(mark_success=True)
         self.assertEqual(ti.state, models.State.SUCCESS)
 
@@ -894,9 +896,9 @@ class TaskInstanceTest(unittest.TestCase):
             dag=dag,
             python_callable=raise_skip_exception,
             owner='airflow',
-            start_date=datetime.datetime(2016, 2, 1, 0, 0, 0))
+            start_date=timezone.datetime(2016, 2, 1, 0, 0, 0))
         ti = TI(
-            task=task, execution_date=datetime.datetime.now())
+            task=task, execution_date=timezone.utcnow())
         ti.run()
         self.assertEqual(models.State.SKIPPED, ti.state)
 
@@ -912,7 +914,7 @@ class TaskInstanceTest(unittest.TestCase):
             retry_delay=datetime.timedelta(seconds=3),
             dag=dag,
             owner='airflow',
-            start_date=datetime.datetime(2016, 2, 1, 0, 0, 0))
+            start_date=timezone.datetime(2016, 2, 1, 0, 0, 0))
 
         def run_with_error(ti):
             try:
@@ -921,7 +923,7 @@ class TaskInstanceTest(unittest.TestCase):
                 pass
 
         ti = TI(
-            task=task, execution_date=datetime.datetime.now())
+            task=task, execution_date=timezone.utcnow())
 
         # first run -- up for retry
         run_with_error(ti)
@@ -953,7 +955,7 @@ class TaskInstanceTest(unittest.TestCase):
             retry_delay=datetime.timedelta(seconds=0),
             dag=dag,
             owner='airflow',
-            start_date=datetime.datetime(2016, 2, 1, 0, 0, 0))
+            start_date=timezone.datetime(2016, 2, 1, 0, 0, 0))
 
         def run_with_error(ti):
             try:
@@ -962,7 +964,7 @@ class TaskInstanceTest(unittest.TestCase):
                 pass
 
         ti = TI(
-            task=task, execution_date=datetime.datetime.now())
+            task=task, execution_date=timezone.utcnow())
 
         # first run -- up for retry
         run_with_error(ti)
@@ -1002,25 +1004,28 @@ class TaskInstanceTest(unittest.TestCase):
             max_retry_delay=max_delay,
             dag=dag,
             owner='airflow',
-            start_date=datetime.datetime(2016, 2, 1, 0, 0, 0))
+            start_date=timezone.datetime(2016, 2, 1, 0, 0, 0))
         ti = TI(
             task=task, execution_date=DEFAULT_DATE)
-        ti.end_date = datetime.datetime.now()
+        ti.end_date = pendulum.instance(timezone.utcnow())
 
         ti.try_number = 1
         dt = ti.next_retry_datetime()
         # between 30 * 2^0.5 and 30 * 2^1 (15 and 30)
-        self.assertEqual(dt, ti.end_date + datetime.timedelta(seconds=20.0))
+        period = ti.end_date.add(seconds=30) - ti.end_date.add(seconds=15)
+        self.assertTrue(dt in period)
 
         ti.try_number = 4
         dt = ti.next_retry_datetime()
         # between 30 * 2^2 and 30 * 2^3 (120 and 240)
-        self.assertEqual(dt, ti.end_date + datetime.timedelta(seconds=181.0))
+        period = ti.end_date.add(seconds=240) - ti.end_date.add(seconds=120)
+        self.assertTrue(dt in period)
 
         ti.try_number = 6
         dt = ti.next_retry_datetime()
         # between 30 * 2^4 and 30 * 2^5 (480 and 960)
-        self.assertEqual(dt, ti.end_date + datetime.timedelta(seconds=825.0))
+        period = ti.end_date.add(seconds=960) - ti.end_date.add(seconds=480)
+        self.assertTrue(dt in period)
 
         ti.try_number = 9
         dt = ti.next_retry_datetime()
@@ -1099,7 +1104,7 @@ class TaskInstanceTest(unittest.TestCase):
                                      failed, upstream_failed, done,
                                      flag_upstream_failed,
                                      expect_state, expect_completed):
-        start_date = datetime.datetime(2016, 2, 1, 0, 0, 0)
+        start_date = timezone.datetime(2016, 2, 1, 0, 0, 0)
         dag = models.DAG('test-dag', start_date=start_date)
         downstream = DummyOperator(task_id='downstream',
                                    dag=dag, owner='airflow',
@@ -1137,8 +1142,8 @@ class TaskInstanceTest(unittest.TestCase):
             dag=dag,
             pool='test_xcom',
             owner='airflow',
-            start_date=datetime.datetime(2016, 6, 2, 0, 0, 0))
-        exec_date = datetime.datetime.now()
+            start_date=timezone.datetime(2016, 6, 2, 0, 0, 0))
+        exec_date = timezone.utcnow()
         ti = TI(
             task=task, execution_date=exec_date)
         ti.run(mark_success=True)
@@ -1171,8 +1176,8 @@ class TaskInstanceTest(unittest.TestCase):
             dag=dag,
             pool='test_xcom',
             owner='airflow',
-            start_date=datetime.datetime(2016, 6, 2, 0, 0, 0))
-        exec_date = datetime.datetime.now()
+            start_date=timezone.datetime(2016, 6, 2, 0, 0, 0))
+        exec_date = timezone.utcnow()
         ti = TI(
             task=task, execution_date=exec_date)
         ti.run(mark_success=True)
@@ -1213,8 +1218,8 @@ class TaskInstanceTest(unittest.TestCase):
             dag=dag,
             python_callable=lambda: 'error',
             owner='airflow',
-            start_date=datetime.datetime(2017, 2, 1))
-        ti = TI(task=task, execution_date=datetime.datetime.now())
+            start_date=timezone.datetime(2017, 2, 1))
+        ti = TI(task=task, execution_date=timezone.utcnow())
 
         with self.assertRaises(TestError):
             ti.run()
@@ -1223,7 +1228,7 @@ class TaskInstanceTest(unittest.TestCase):
         dag = models.DAG(dag_id='test_check_and_change_state_before_execution')
         task = DummyOperator(task_id='task', dag=dag, start_date=DEFAULT_DATE)
         ti = TI(
-            task=task, execution_date=datetime.datetime.now())
+            task=task, execution_date=timezone.utcnow())
         self.assertTrue(ti._check_and_change_state_before_execution())
 
     def test_check_and_change_state_before_execution_dep_not_met(self):
@@ -1232,7 +1237,7 @@ class TaskInstanceTest(unittest.TestCase):
         task2= DummyOperator(task_id='task2', dag=dag, start_date=DEFAULT_DATE)
         task >> task2
         ti = TI(
-            task=task2, execution_date=datetime.datetime.now())
+            task=task2, execution_date=timezone.utcnow())
         self.assertFalse(ti._check_and_change_state_before_execution())
 
     def test_get_num_running_task_instances(self):
@@ -1257,7 +1262,7 @@ class TaskInstanceTest(unittest.TestCase):
         self.assertEquals(1, 
ti1.get_num_running_task_instances(session=session))
         self.assertEquals(1, 
ti2.get_num_running_task_instances(session=session))
         self.assertEquals(1, 
ti3.get_num_running_task_instances(session=session))
-        
+
 
 class ClearTasksTest(unittest.TestCase):
     def test_clear_task_instances(self):
@@ -1457,7 +1462,7 @@ class ClearTasksTest(unittest.TestCase):
 
     def test_xcom_disable_pickle_type(self):
         json_obj = {"key": "value"}
-        execution_date = datetime.datetime.now()
+        execution_date = timezone.utcnow()
         key = "xcom_test1"
         dag_id = "test_dag1"
         task_id = "test_task1"
@@ -1479,7 +1484,7 @@ class ClearTasksTest(unittest.TestCase):
 
     def test_xcom_enable_pickle_type(self):
         json_obj = {"key": "value"}
-        execution_date = datetime.datetime.now()
+        execution_date = timezone.utcnow()
         key = "xcom_test2"
         dag_id = "test_dag2"
         task_id = "test_task2"
@@ -1508,12 +1513,12 @@ class ClearTasksTest(unittest.TestCase):
                           value=PickleRce(),
                           dag_id="test_dag3",
                           task_id="test_task3",
-                          execution_date=datetime.datetime.now(),
+                          execution_date=timezone.utcnow(),
                           enable_pickling=False)
 
     def test_xcom_get_many(self):
         json_obj = {"key": "value"}
-        execution_date = datetime.datetime.now()
+        execution_date = timezone.utcnow()
         key = "xcom_test4"
         dag_id1 = "test_dag4"
         task_id1 = "test_task4"

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/operators/latest_only_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/latest_only_operator.py 
b/tests/operators/latest_only_operator.py
index 225d24f..44fff23 100644
--- a/tests/operators/latest_only_operator.py
+++ b/tests/operators/latest_only_operator.py
@@ -23,13 +23,14 @@ from airflow.jobs import BackfillJob
 from airflow.models import TaskInstance
 from airflow.operators.latest_only_operator import LatestOnlyOperator
 from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils import timezone
 from airflow.utils.state import State
 from freezegun import freeze_time
 
-DEFAULT_DATE = datetime.datetime(2016, 1, 1)
-END_DATE = datetime.datetime(2016, 1, 2)
+DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+END_DATE = timezone.datetime(2016, 1, 2)
 INTERVAL = datetime.timedelta(hours=12)
-FROZEN_NOW = datetime.datetime(2016, 1, 2, 12, 1, 1)
+FROZEN_NOW = timezone.datetime(2016, 1, 2, 12, 1, 1)
 
 
 def get_task_instances(task_id):
@@ -85,27 +86,27 @@ class LatestOnlyOperatorTest(unittest.TestCase):
         exec_date_to_latest_state = {
             ti.execution_date: ti.state for ti in latest_instances}
         self.assertEqual({
-            datetime.datetime(2016, 1, 1): 'success',
-            datetime.datetime(2016, 1, 1, 12): 'success',
-            datetime.datetime(2016, 1, 2): 'success', },
+            timezone.datetime(2016, 1, 1): 'success',
+            timezone.datetime(2016, 1, 1, 12): 'success',
+            timezone.datetime(2016, 1, 2): 'success', },
             exec_date_to_latest_state)
 
         downstream_instances = get_task_instances('downstream')
         exec_date_to_downstream_state = {
             ti.execution_date: ti.state for ti in downstream_instances}
         self.assertEqual({
-            datetime.datetime(2016, 1, 1): 'skipped',
-            datetime.datetime(2016, 1, 1, 12): 'skipped',
-            datetime.datetime(2016, 1, 2): 'success',},
+            timezone.datetime(2016, 1, 1): 'skipped',
+            timezone.datetime(2016, 1, 1, 12): 'skipped',
+            timezone.datetime(2016, 1, 2): 'success',},
             exec_date_to_downstream_state)
 
         downstream_instances = get_task_instances('downstream_2')
         exec_date_to_downstream_state = {
             ti.execution_date: ti.state for ti in downstream_instances}
         self.assertEqual({
-            datetime.datetime(2016, 1, 1): 'skipped',
-            datetime.datetime(2016, 1, 1, 12): 'skipped',
-            datetime.datetime(2016, 1, 2): 'success',},
+            timezone.datetime(2016, 1, 1): 'skipped',
+            timezone.datetime(2016, 1, 1, 12): 'skipped',
+            timezone.datetime(2016, 1, 2): 'success',},
             exec_date_to_downstream_state)
 
     def test_skipping_dagrun(self):
@@ -124,21 +125,21 @@ class LatestOnlyOperatorTest(unittest.TestCase):
 
         dr1 = self.dag.create_dagrun(
             run_id="manual__1",
-            start_date=datetime.datetime.now(),
+            start_date=timezone.utcnow(),
             execution_date=DEFAULT_DATE,
             state=State.RUNNING
         )
 
         dr2 = self.dag.create_dagrun(
             run_id="manual__2",
-            start_date=datetime.datetime.now(),
-            execution_date=datetime.datetime(2016, 1, 1, 12),
+            start_date=timezone.utcnow(),
+            execution_date=timezone.datetime(2016, 1, 1, 12),
             state=State.RUNNING
         )
 
         dr2 = self.dag.create_dagrun(
             run_id="manual__3",
-            start_date=datetime.datetime.now(),
+            start_date=timezone.utcnow(),
             execution_date=END_DATE,
             state=State.RUNNING
         )
@@ -151,25 +152,25 @@ class LatestOnlyOperatorTest(unittest.TestCase):
         exec_date_to_latest_state = {
             ti.execution_date: ti.state for ti in latest_instances}
         self.assertEqual({
-            datetime.datetime(2016, 1, 1): 'success',
-            datetime.datetime(2016, 1, 1, 12): 'success',
-            datetime.datetime(2016, 1, 2): 'success', },
+            timezone.datetime(2016, 1, 1): 'success',
+            timezone.datetime(2016, 1, 1, 12): 'success',
+            timezone.datetime(2016, 1, 2): 'success', },
             exec_date_to_latest_state)
 
         downstream_instances = get_task_instances('downstream')
         exec_date_to_downstream_state = {
             ti.execution_date: ti.state for ti in downstream_instances}
         self.assertEqual({
-            datetime.datetime(2016, 1, 1): 'skipped',
-            datetime.datetime(2016, 1, 1, 12): 'skipped',
-            datetime.datetime(2016, 1, 2): 'success',},
+            timezone.datetime(2016, 1, 1): 'skipped',
+            timezone.datetime(2016, 1, 1, 12): 'skipped',
+            timezone.datetime(2016, 1, 2): 'success',},
             exec_date_to_downstream_state)
 
         downstream_instances = get_task_instances('downstream_2')
         exec_date_to_downstream_state = {
             ti.execution_date: ti.state for ti in downstream_instances}
         self.assertEqual({
-            datetime.datetime(2016, 1, 1): 'skipped',
-            datetime.datetime(2016, 1, 1, 12): 'skipped',
-            datetime.datetime(2016, 1, 2): 'success',},
+            timezone.datetime(2016, 1, 1): 'skipped',
+            timezone.datetime(2016, 1, 1, 12): 'skipped',
+            timezone.datetime(2016, 1, 2): 'success',},
             exec_date_to_downstream_state)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/operators/operators.py
----------------------------------------------------------------------
diff --git a/tests/operators/operators.py b/tests/operators/operators.py
index 0f5abd5..40f0ffd 100644
--- a/tests/operators/operators.py
+++ b/tests/operators/operators.py
@@ -14,16 +14,15 @@
 
 from __future__ import print_function
 
-import datetime
-
 from airflow import DAG, configuration, operators
 from airflow.utils.tests import skipUnlessImported
+from airflow.utils import timezone
 
 configuration.load_test_config()
 
 import unittest
 
-DEFAULT_DATE = datetime.datetime(2015, 1, 1)
+DEFAULT_DATE = timezone.datetime(2015, 1, 1)
 DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
 DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10]
 TEST_DAG_ID = 'unit_test_dag'
@@ -251,7 +250,7 @@ class TransferTests(unittest.TestCase):
     def test_clear(self):
         self.dag.clear(
             start_date=DEFAULT_DATE,
-            end_date=datetime.datetime.now())
+            end_date=timezone.utcnow())
 
     def test_mysql_to_hive(self):
         # import airflow.operators

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/operators/python_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/python_operator.py 
b/tests/operators/python_operator.py
index 74120fe..6fa5e5a 100644
--- a/tests/operators/python_operator.py
+++ b/tests/operators/python_operator.py
@@ -23,15 +23,16 @@ from airflow.operators.python_operator import 
PythonOperator, BranchPythonOperat
 from airflow.operators.python_operator import ShortCircuitOperator
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.settings import Session
+from airflow.utils import timezone
 from airflow.utils.state import State
 
 from airflow.exceptions import AirflowException
 import logging
 
-DEFAULT_DATE = datetime.datetime(2016, 1, 1)
-END_DATE = datetime.datetime(2016, 1, 2)
+DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+END_DATE = timezone.datetime(2016, 1, 2)
 INTERVAL = datetime.timedelta(hours=12)
-FROZEN_NOW = datetime.datetime(2016, 1, 2, 12, 1, 1)
+FROZEN_NOW = timezone.datetime(2016, 1, 2, 12, 1, 1)
 
 
 class PythonOperatorTest(unittest.TestCase):
@@ -127,7 +128,7 @@ class BranchOperatorTest(unittest.TestCase):
     def test_with_dag_run(self):
         dr = self.dag.create_dagrun(
             run_id="manual__",
-            start_date=datetime.datetime.now(),
+            start_date=timezone.utcnow(),
             execution_date=DEFAULT_DATE,
             state=State.RUNNING
         )
@@ -225,7 +226,7 @@ class ShortCircuitOperatorTest(unittest.TestCase):
         logging.error("Tasks {}".format(dag.tasks))
         dr = dag.create_dagrun(
             run_id="manual__",
-            start_date=datetime.datetime.now(),
+            start_date=timezone.utcnow(),
             execution_date=DEFAULT_DATE,
             state=State.RUNNING
         )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/operators/sensors.py
----------------------------------------------------------------------
diff --git a/tests/operators/sensors.py b/tests/operators/sensors.py
index ee67524..d09dabe 100644
--- a/tests/operators/sensors.py
+++ b/tests/operators/sensors.py
@@ -15,7 +15,7 @@ import logging
 import sys
 import time
 import unittest
-from datetime import datetime, timedelta
+from datetime import timedelta
 from mock import patch
 
 from airflow import DAG, configuration, settings
@@ -28,6 +28,8 @@ from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.sensors import HttpSensor, BaseSensorOperator, 
HdfsSensor, ExternalTaskSensor
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.state import State
+from airflow.utils import timezone
+from airflow.utils.timezone import datetime
 
 try:
     from unittest import mock
@@ -64,12 +66,12 @@ class TimeoutTestSensor(BaseSensorOperator):
         return self.return_value
 
     def execute(self, context):
-        started_at = datetime.now()
+        started_at = timezone.utcnow()
         time_jump = self.params.get('time_jump')
         while not self.poke(context):
             if time_jump:
                 started_at -= time_jump
-            if (datetime.now() - started_at).total_seconds() > self.timeout:
+            if (timezone.utcnow() - started_at).total_seconds() > self.timeout:
                 if self.soft_fail:
                     raise AirflowSkipException('Snap. Time is OUT.')
                 else:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/operators/subdag_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/subdag_operator.py 
b/tests/operators/subdag_operator.py
index 9224f63..026eb3c 100644
--- a/tests/operators/subdag_operator.py
+++ b/tests/operators/subdag_operator.py
@@ -12,7 +12,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import datetime
 import os
 import unittest
 
@@ -25,14 +24,16 @@ from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.subdag_operator import SubDagOperator
 from airflow.jobs import BackfillJob
 from airflow.exceptions import AirflowException
+from airflow.utils.timezone import datetime
 
-DEFAULT_DATE = datetime.datetime(2016, 1, 1)
+DEFAULT_DATE = datetime(2016, 1, 1)
 
 default_args = dict(
     owner='airflow',
     start_date=DEFAULT_DATE,
 )
 
+
 class SubDagOperatorTests(unittest.TestCase):
 
     def test_subdag_name(self):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/operators/test_virtualenv_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/test_virtualenv_operator.py 
b/tests/operators/test_virtualenv_operator.py
index fdd2742..03623a6 100644
--- a/tests/operators/test_virtualenv_operator.py
+++ b/tests/operators/test_virtualenv_operator.py
@@ -15,6 +15,7 @@
 from __future__ import print_function, unicode_literals
 
 import datetime
+
 import funcsigs
 import sys
 import unittest
@@ -25,15 +26,15 @@ from airflow import configuration, DAG
 from airflow.models import TaskInstance
 from airflow.operators.python_operator import PythonVirtualenvOperator
 from airflow.settings import Session
-from airflow.utils.state import State
+from airflow.utils import timezone
 
 from airflow.exceptions import AirflowException
 import logging
 
-DEFAULT_DATE = datetime.datetime(2016, 1, 1)
-END_DATE = datetime.datetime(2016, 1, 2)
+DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+END_DATE = timezone.datetime(2016, 1, 2)
 INTERVAL = datetime.timedelta(hours=12)
-FROZEN_NOW = datetime.datetime(2016, 1, 2, 12, 1, 1)
+FROZEN_NOW = timezone.datetime(2016, 1, 2, 12, 1, 1)
 
 
 class TestPythonVirtualenvOperator(unittest.TestCase):
@@ -185,7 +186,7 @@ class TestPythonVirtualenvOperator(unittest.TestCase):
     def test_nonimported_as_arg(self):
         def f(a):
             return None
-        self._run_as_operator(f, op_args=[datetime.datetime.now()])
+        self._run_as_operator(f, op_args=[datetime.datetime.utcnow()])
 
     def test_context(self):
         def f(**kwargs):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/ti_deps/deps/test_not_in_retry_period_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/test_not_in_retry_period_dep.py 
b/tests/ti_deps/deps/test_not_in_retry_period_dep.py
index 0f23aab..38502fb 100644
--- a/tests/ti_deps/deps/test_not_in_retry_period_dep.py
+++ b/tests/ti_deps/deps/test_not_in_retry_period_dep.py
@@ -13,13 +13,14 @@
 # limitations under the License.
 
 import unittest
-from datetime import datetime, timedelta
+from datetime import timedelta
 from freezegun import freeze_time
 from mock import Mock
 
 from airflow.models import TaskInstance
 from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
 from airflow.utils.state import State
+from airflow.utils.timezone import datetime
 
 
 class NotInRetryPeriodDepTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/ti_deps/deps/test_runnable_exec_date_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/test_runnable_exec_date_dep.py 
b/tests/ti_deps/deps/test_runnable_exec_date_dep.py
index e1a396c..28b285f 100644
--- a/tests/ti_deps/deps/test_runnable_exec_date_dep.py
+++ b/tests/ti_deps/deps/test_runnable_exec_date_dep.py
@@ -13,13 +13,12 @@
 # limitations under the License.
 
 import unittest
-from datetime import datetime
 from freezegun import freeze_time
 from mock import Mock
 
 from airflow.models import TaskInstance
 from airflow.ti_deps.deps.runnable_exec_date_dep import RunnableExecDateDep
-
+from airflow.utils.timezone import datetime
 
 class RunnableExecDateDepTest(unittest.TestCase):
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/utils/log/test_file_processor_handler.py
----------------------------------------------------------------------
diff --git a/tests/utils/log/test_file_processor_handler.py 
b/tests/utils/log/test_file_processor_handler.py
index defe623..8a3bbd2 100644
--- a/tests/utils/log/test_file_processor_handler.py
+++ b/tests/utils/log/test_file_processor_handler.py
@@ -17,7 +17,7 @@ import os
 import unittest
 
 from airflow.utils.log.file_processor_handler import FileProcessorHandler
-from datetime import datetime
+from airflow.utils import timezone
 from datetime import timedelta
 from freezegun import freeze_time
 
@@ -31,7 +31,7 @@ class TestFileProcessorHandler(unittest.TestCase):
         self.dag_dir = "/dags"
 
     def test_non_template(self):
-        date = datetime.utcnow().strftime("%Y-%m-%d")
+        date = timezone.utcnow().strftime("%Y-%m-%d")
         handler = FileProcessorHandler(base_log_folder=self.base_log_folder,
                                        filename_template=self.filename)
         handler.dag_dir = self.dag_dir
@@ -44,7 +44,7 @@ class TestFileProcessorHandler(unittest.TestCase):
         self.assertTrue(os.path.exists(os.path.join(path, "logfile")))
 
     def test_template(self):
-        date = datetime.utcnow().strftime("%Y-%m-%d")
+        date = timezone.utcnow().strftime("%Y-%m-%d")
         handler = FileProcessorHandler(base_log_folder=self.base_log_folder,
                                        
filename_template=self.filename_template)
         handler.dag_dir = self.dag_dir
@@ -61,8 +61,8 @@ class TestFileProcessorHandler(unittest.TestCase):
                                        filename_template=self.filename)
         handler.dag_dir = self.dag_dir
 
-        date1 = (datetime.utcnow() + timedelta(days=1)).strftime("%Y-%m-%d")
-        date2 = (datetime.utcnow() + timedelta(days=2)).strftime("%Y-%m-%d")
+        date1 = (timezone.utcnow() + timedelta(days=1)).strftime("%Y-%m-%d")
+        date2 = (timezone.utcnow() + timedelta(days=2)).strftime("%Y-%m-%d")
 
         p1 = os.path.join(self.base_log_folder, date1, "log1")
         p2 = os.path.join(self.base_log_folder, date1, "log2")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/utils/log/test_s3_task_handler.py
----------------------------------------------------------------------
diff --git a/tests/utils/log/test_s3_task_handler.py 
b/tests/utils/log/test_s3_task_handler.py
index b1354cd..dc32b5a 100644
--- a/tests/utils/log/test_s3_task_handler.py
+++ b/tests/utils/log/test_s3_task_handler.py
@@ -12,12 +12,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from datetime import datetime
 import mock
 import unittest
 
 from airflow import configuration
 from airflow.utils.log.s3_task_handler import S3TaskHandler
+from airflow.utils.timezone import datetime
 from airflow.hooks.S3_hook import S3Hook
 from airflow.models import TaskInstance, DAG
 from airflow.operators.dummy_operator import DummyOperator

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/utils/test_dates.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_dates.py b/tests/utils/test_dates.py
index 50e76ba..199de4a 100644
--- a/tests/utils/test_dates.py
+++ b/tests/utils/test_dates.py
@@ -13,16 +13,18 @@
 # limitations under the License.
 
 from datetime import datetime, timedelta
+import pendulum
 import unittest
 
 from airflow.utils import dates
+from airflow.utils import timezone
 
 
 class Dates(unittest.TestCase):
 
     def test_days_ago(self):
-        today = datetime.today()
-        today_midnight = datetime.fromordinal(today.date().toordinal())
+        today = pendulum.today()
+        today_midnight = 
pendulum.instance(datetime.fromordinal(today.date().toordinal()))
 
         self.assertTrue(dates.days_ago(0) == today_midnight)
 
@@ -43,9 +45,9 @@ class Dates(unittest.TestCase):
 
     def test_parse_execution_date(self):
         execution_date_str_wo_ms = '2017-11-02 00:00:00'
-        execution_date_str_w_ms = '2017-11-05 16:18:30..989729'
-        bad_execution_date_str = '2017-11-06T00:00:00Z'
+        execution_date_str_w_ms = '2017-11-05 16:18:30.989729'
+        bad_execution_date_str = '2017-11-06TXX:00:00Z'
 
-        self.assertEqual(datetime(2017, 11, 2, 0, 0, 0), 
dates.parse_execution_date(execution_date_str_wo_ms))
-        self.assertEqual(datetime(2017, 11, 5, 16, 18, 30, 989729), 
dates.parse_execution_date(execution_date_str_w_ms))
+        self.assertEqual(timezone.datetime(2017, 11, 2, 0, 0, 0), 
dates.parse_execution_date(execution_date_str_wo_ms))
+        self.assertEqual(timezone.datetime(2017, 11, 5, 16, 18, 30, 989729), 
dates.parse_execution_date(execution_date_str_w_ms))
         self.assertRaises(ValueError, dates.parse_execution_date, 
bad_execution_date_str)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/utils/test_log_handlers.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
index 25faa7c..fd5006c 100644
--- a/tests/utils/test_log_handlers.py
+++ b/tests/utils/test_log_handlers.py
@@ -19,10 +19,10 @@ import mock
 import os
 import unittest
 
-from datetime import datetime
 from airflow.models import TaskInstance, DAG
 from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
 from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils.timezone import datetime
 from airflow.utils.log.file_task_handler import FileTaskHandler
 
 DEFAULT_DATE = datetime(2016, 1, 1)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/www/api/experimental/test_endpoints.py
----------------------------------------------------------------------
diff --git a/tests/www/api/experimental/test_endpoints.py 
b/tests/www/api/experimental/test_endpoints.py
index 65a6f75..2c510a6 100644
--- a/tests/www/api/experimental/test_endpoints.py
+++ b/tests/www/api/experimental/test_endpoints.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from datetime import datetime, timedelta
+from datetime import timedelta
 import json
 import unittest
 from urllib.parse import quote_plus
@@ -21,6 +21,7 @@ from airflow import configuration
 from airflow.api.common.experimental.trigger_dag import trigger_dag
 from airflow.models import DagBag, DagRun, Pool, TaskInstance
 from airflow.settings import Session
+from airflow.utils.timezone import datetime, utcnow
 from airflow.www import app as application
 
 
@@ -75,7 +76,7 @@ class TestApiExperimental(unittest.TestCase):
         url_template = '/api/experimental/dags/{}/dag_runs'
         response = self.app.post(
             url_template.format('example_bash_operator'),
-            data=json.dumps({'run_id': 'my_run' + datetime.now().isoformat()}),
+            data=json.dumps({'run_id': 'my_run' + utcnow().isoformat()}),
             content_type="application/json"
         )
 
@@ -91,7 +92,7 @@ class TestApiExperimental(unittest.TestCase):
     def test_trigger_dag_for_date(self):
         url_template = '/api/experimental/dags/{}/dag_runs'
         dag_id = 'example_bash_operator'
-        hour_from_now = datetime.now() + timedelta(hours=1)
+        hour_from_now = utcnow() + timedelta(hours=1)
         execution_date = datetime(hour_from_now.year,
                                   hour_from_now.month,
                                   hour_from_now.day,
@@ -133,7 +134,7 @@ class TestApiExperimental(unittest.TestCase):
         url_template = '/api/experimental/dags/{}/dag_runs/{}/tasks/{}'
         dag_id = 'example_bash_operator'
         task_id = 'also_run_this'
-        execution_date = datetime.now().replace(microsecond=0)
+        execution_date = utcnow().replace(microsecond=0)
         datetime_string = quote_plus(execution_date.isoformat())
         wrong_datetime_string = quote_plus(
             datetime(1990, 1, 1, 1, 1, 1).isoformat()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/www/test_views.py
----------------------------------------------------------------------
diff --git a/tests/www/test_views.py b/tests/www/test_views.py
index 4931487..a9bb28f 100644
--- a/tests/www/test_views.py
+++ b/tests/www/test_views.py
@@ -18,7 +18,6 @@ import os
 import shutil
 import tempfile
 import unittest
-from datetime import datetime
 import sys
 
 from airflow import models, configuration, settings
@@ -26,6 +25,7 @@ from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONF
 from airflow.models import DAG, TaskInstance
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.settings import Session
+from airflow.utils.timezone import datetime
 from airflow.www import app as application
 from airflow import configuration as conf
 

Reply via email to