[AIRFLOW-1809] Update tests to use timezone aware objects
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9624f5f2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9624f5f2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9624f5f2 Branch: refs/heads/master Commit: 9624f5f24e00299c66adfd799d2be59fabd17f03 Parents: dcac3e9 Author: Bolke de Bruin <[email protected]> Authored: Wed Nov 22 16:09:50 2017 +0100 Committer: Bolke de Bruin <[email protected]> Committed: Mon Nov 27 15:54:27 2017 +0100 ---------------------------------------------------------------------- airflow/models.py | 7 +- airflow/utils/dates.py | 11 +-- airflow/utils/timezone.py | 18 +++- airflow/www/utils.py | 5 +- airflow/www/views.py | 43 ++++----- tests/api/client/test_local_client.py | 31 +------ tests/contrib/operators/test_fs_operator.py | 5 +- .../operators/test_jira_operator_test.py | 5 +- tests/contrib/operators/test_sftp_operator.py | 21 ++--- .../operators/test_spark_submit_operator.py | 9 +- tests/contrib/operators/test_ssh_operator.py | 12 +-- tests/contrib/sensors/test_jira_sensor_test.py | 6 +- tests/contrib/sensors/test_redis_sensor.py | 4 +- tests/core.py | 32 +++---- tests/dags/test_cli_triggered_dags.py | 4 +- tests/executors/dask_executor.py | 13 ++- tests/impersonation.py | 2 +- tests/jobs.py | 33 +++---- tests/models.py | 91 +++++++++++--------- tests/operators/latest_only_operator.py | 51 +++++------ tests/operators/operators.py | 7 +- tests/operators/python_operator.py | 11 +-- tests/operators/sensors.py | 8 +- tests/operators/subdag_operator.py | 5 +- tests/operators/test_virtualenv_operator.py | 11 +-- .../deps/test_not_in_retry_period_dep.py | 3 +- .../ti_deps/deps/test_runnable_exec_date_dep.py | 3 +- tests/utils/log/test_file_processor_handler.py | 10 +-- tests/utils/log/test_s3_task_handler.py | 2 +- tests/utils/test_dates.py | 14 +-- tests/utils/test_log_handlers.py | 2 +- tests/www/api/experimental/test_endpoints.py | 9 +- tests/www/test_views.py | 2 +- 33 files changed, 251 insertions(+), 239 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 33f3636..e93e8a8 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -2892,7 +2892,11 @@ class DAG(BaseDag, LoggingMixin): # set timezone if start_date and start_date.tzinfo: self.timezone = start_date.tzinfo - elif 'start_date' in self.default_args and self.default_args['start_date'].tzinfo: + elif 'start_date' in self.default_args: + if isinstance(self.default_args['start_date'], six.string_types): + self.default_args['start_date'] = ( + timezone.parse(self.default_args['start_date']) + ) self.timezone = self.default_args['start_date'].tzinfo else: self.timezone = settings.TIMEZONE @@ -3066,7 +3070,6 @@ class DAG(BaseDag, LoggingMixin): # in case of @once if not following: return dttm - if self.previous_schedule(following) != dttm: return following http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/airflow/utils/dates.py ---------------------------------------------------------------------- diff --git a/airflow/utils/dates.py b/airflow/utils/dates.py index 7d0d9d9..dab2b0d 100644 --- a/airflow/utils/dates.py +++ b/airflow/utils/dates.py @@ -99,7 +99,7 @@ def date_range( return sorted(l) -def round_time(dt, delta, start_date=datetime.min): +def round_time(dt, delta, start_date=timezone.make_aware(datetime.min)): """ Returns the datetime of the form start_date + i * delta which is closest to dt for any non-negative integer i. @@ -232,11 +232,4 @@ def parse_execution_date(execution_date_str): """ Parse execution date string to datetime object. """ - try: - # Execution date follows execution date format of scheduled executions, - # e.g. '2017-11-02 00:00:00' - return datetime.strptime(execution_date_str, '%Y-%m-%d %H:%M:%S') - except ValueError: - # Execution date follows execution date format of manually triggered executions, - # e.g. '2017-11-05 16:18:30..989729' - return datetime.strptime(execution_date_str, '%Y-%m-%d %H:%M:%S..%f') + return timezone.parse(execution_date_str) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/airflow/utils/timezone.py ---------------------------------------------------------------------- diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py index 5ae7802..e384a14 100644 --- a/airflow/utils/timezone.py +++ b/airflow/utils/timezone.py @@ -50,7 +50,13 @@ def utcnow(): :return: """ - return pendulum.utcnow() + # pendulum utcnow() is not used as that sets a TimezoneInfo object + # instead of a Timezone. This is not pickable and also creates issues + # when using replace() + d = dt.datetime.utcnow() + d = d.replace(tzinfo=utc) + + return d def convert_to_utc(value): @@ -94,7 +100,7 @@ def make_aware(value, timezone=None): return timezone.convert(value) else: # This may be wrong around DST changes! - return value.replace(tzinfo=timezone) + return value.astimezone(tz=timezone) def make_naive(value, timezone=None): @@ -136,3 +142,11 @@ def datetime(*args, **kwargs): kwargs['tzinfo'] = TIMEZONE return dt.datetime(*args, **kwargs) + + +def parse(string): + """ + Parse a time string and return an aware datetime + :param string: time string + """ + return pendulum.parse(string, tz=TIMEZONE) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/airflow/www/utils.py ---------------------------------------------------------------------- diff --git a/airflow/www/utils.py b/airflow/www/utils.py index aba85fa..a0833ee 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -21,7 +21,6 @@ from cgi import escape from io import BytesIO as IO import functools import gzip -import iso8601 import json import time @@ -34,6 +33,7 @@ from wtforms.compat import text_type from airflow import configuration, models, settings from airflow.utils.db import create_session +from airflow.utils import timezone from airflow.utils.json import AirflowJsonEncoder AUTHENTICATE = configuration.getboolean('webserver', 'AUTHENTICATE') @@ -255,8 +255,7 @@ def action_logging(f): dag_id=request.args.get('dag_id')) if 'execution_date' in request.args: - log.execution_date = iso8601.parse_date( - request.args.get('execution_date'), settings.TIMEZONE) + log.execution_date = timezone.parse(request.args.get('execution_date')) with create_session() as session: session.add(log) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/airflow/www/views.py ---------------------------------------------------------------------- diff --git a/airflow/www/views.py b/airflow/www/views.py index 550a7f8..5ecee42 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -22,11 +22,11 @@ import pkg_resources import socket from functools import wraps from datetime import timedelta -import dateutil.parser import copy import math import json import bleach +import pendulum from collections import defaultdict import inspect @@ -78,6 +78,7 @@ from airflow.utils.state import State from airflow.utils.db import create_session, provide_session from airflow.utils.helpers import alchemy_to_dict from airflow.utils.dates import infer_time_unit, scale_time_units, parse_execution_date +from airflow.utils.timezone import datetime from airflow.www import utils as wwwutils from airflow.www.forms import DateTimeForm, DateTimeWithNumRunsForm from airflow.www.validators import GreaterEqualThan @@ -669,7 +670,7 @@ class Airflow(BaseView): dag_id = request.args.get('dag_id') task_id = request.args.get('task_id') execution_date = request.args.get('execution_date') - dttm = dateutil.parser.parse(execution_date) + dttm = pendulum.parse(execution_date) form = DateTimeForm(data={'execution_date': dttm}) dag = dagbag.get_dag(dag_id) task = copy.copy(dag.get_task(task_id)) @@ -705,7 +706,7 @@ class Airflow(BaseView): dag_id = request.args.get('dag_id') task_id = request.args.get('task_id') execution_date = request.args.get('execution_date') - dttm = dateutil.parser.parse(execution_date) + dttm = pendulum.parse(execution_date) form = DateTimeForm(data={'execution_date': dttm}) dag = dagbag.get_dag(dag_id) ti = session.query(models.TaskInstance).filter( @@ -746,7 +747,7 @@ class Airflow(BaseView): # Carrying execution_date through, even though it's irrelevant for # this context execution_date = request.args.get('execution_date') - dttm = dateutil.parser.parse(execution_date) + dttm = pendulum.parse(execution_date) form = DateTimeForm(data={'execution_date': dttm}) dag = dagbag.get_dag(dag_id) @@ -823,7 +824,7 @@ class Airflow(BaseView): # Carrying execution_date through, even though it's irrelevant for # this context execution_date = request.args.get('execution_date') - dttm = dateutil.parser.parse(execution_date) + dttm = pendulum.parse(execution_date) form = DateTimeForm(data={'execution_date': dttm}) dag = dagbag.get_dag(dag_id) if not dag or task_id not in dag.task_ids: @@ -863,7 +864,7 @@ class Airflow(BaseView): task = dag.get_task(task_id) execution_date = request.args.get('execution_date') - execution_date = dateutil.parser.parse(execution_date) + execution_date = pendulum.parse(execution_date) ignore_all_deps = request.args.get('ignore_all_deps') == "true" ignore_task_deps = request.args.get('ignore_task_deps') == "true" ignore_ti_state = request.args.get('ignore_ti_state') == "true" @@ -987,7 +988,7 @@ class Airflow(BaseView): dag = dagbag.get_dag(dag_id) execution_date = request.args.get('execution_date') - execution_date = dateutil.parser.parse(execution_date) + execution_date = pendulum.parse(execution_date) confirmed = request.args.get('confirmed') == "true" upstream = request.args.get('upstream') == "true" downstream = request.args.get('downstream') == "true" @@ -1018,7 +1019,7 @@ class Airflow(BaseView): confirmed = request.args.get('confirmed') == "true" dag = dagbag.get_dag(dag_id) - execution_date = dateutil.parser.parse(execution_date) + execution_date = pendulum.parse(execution_date) start_date = execution_date end_date = execution_date @@ -1062,7 +1063,7 @@ class Airflow(BaseView): flash('Invalid execution date', 'error') return redirect(origin) - execution_date = dateutil.parser.parse(execution_date) + execution_date = pendulum.parse(execution_date) dag = dagbag.get_dag(dag_id) if not dag: @@ -1099,7 +1100,7 @@ class Airflow(BaseView): task.dag = dag execution_date = request.args.get('execution_date') - execution_date = dateutil.parser.parse(execution_date) + execution_date = pendulum.parse(execution_date) confirmed = request.args.get('confirmed') == "true" upstream = request.args.get('upstream') == "true" downstream = request.args.get('downstream') == "true" @@ -1160,7 +1161,7 @@ class Airflow(BaseView): num_runs = int(num_runs) if num_runs else 25 if base_date: - base_date = dateutil.parser.parse(base_date) + base_date = pendulum.parse(base_date) else: base_date = dag.latest_execution_date or timezone.utcnow() @@ -1218,7 +1219,7 @@ class Airflow(BaseView): def set_duration(tid): if (isinstance(tid, dict) and tid.get("state") == State.RUNNING and tid["start_date"] is not None): - d = timezone.utcnow() - dateutil.parser.parse(tid["start_date"]) + d = timezone.utcnow() - pendulum.parse(tid["start_date"]) tid["duration"] = d.total_seconds() return tid @@ -1313,9 +1314,9 @@ class Airflow(BaseView): dttm = request.args.get('execution_date') if dttm: - dttm = dateutil.parser.parse(dttm) + dttm = pendulum.parse(dttm) else: - dttm = dag.latest_execution_date or timezone.utcnow().date() + dttm = dag.latest_execution_date or timezone.utcnow() DR = models.DagRun drs = ( @@ -1389,7 +1390,7 @@ class Airflow(BaseView): num_runs = int(num_runs) if num_runs else 25 if base_date: - base_date = dateutil.parser.parse(base_date) + base_date = pendulum.parse(base_date) else: base_date = dag.latest_execution_date or timezone.utcnow() @@ -1496,7 +1497,7 @@ class Airflow(BaseView): num_runs = int(num_runs) if num_runs else 25 if base_date: - base_date = dateutil.parser.parse(base_date) + base_date = pendulum.parse(base_date) else: base_date = dag.latest_execution_date or timezone.utcnow() @@ -1559,7 +1560,7 @@ class Airflow(BaseView): num_runs = int(num_runs) if num_runs else 25 if base_date: - base_date = dateutil.parser.parse(base_date) + base_date = pendulum.parse(base_date) else: base_date = dag.latest_execution_date or timezone.utcnow() @@ -1686,9 +1687,9 @@ class Airflow(BaseView): dttm = request.args.get('execution_date') if dttm: - dttm = dateutil.parser.parse(dttm) + dttm = pendulum.parse(dttm) else: - dttm = dag.latest_execution_date or timezone.utcnow().date() + dttm = dag.latest_execution_date or timezone.utcnow() form = DateTimeForm(data={'execution_date': dttm}) @@ -1741,7 +1742,7 @@ class Airflow(BaseView): dttm = request.args.get('execution_date') if dttm: - dttm = dateutil.parser.parse(dttm) + dttm = pendulum.parse(dttm) else: return ("Error: Invalid execution_date") @@ -2586,7 +2587,7 @@ class TaskInstanceModelView(ModelViewOnly): https://github.com/flask-admin/flask-admin/issues/1226 """ task_id, dag_id, execution_date = iterdecode(id) - execution_date = dateutil.parser.parse(execution_date) + execution_date = pendulum.parse(execution_date) return self.session.query(self.model).get((task_id, dag_id, execution_date)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/api/client/test_local_client.py ---------------------------------------------------------------------- diff --git a/tests/api/client/test_local_client.py b/tests/api/client/test_local_client.py index 7a759fe..31a1712 100644 --- a/tests/api/client/test_local_client.py +++ b/tests/api/client/test_local_client.py @@ -12,45 +12,23 @@ # See the License for the specific language governing permissions and # limitations under the License. -import datetime import json import unittest +from freezegun import freeze_time from mock import patch from airflow import AirflowException from airflow.api.client.local_client import Client from airflow import models from airflow import settings +from airflow.utils import timezone from airflow.utils.state import State -EXECDATE = datetime.datetime.now() +EXECDATE = timezone.utcnow() EXECDATE_NOFRACTIONS = EXECDATE.replace(microsecond=0) EXECDATE_ISO = EXECDATE_NOFRACTIONS.isoformat() -real_datetime_class = datetime.datetime - - -def mock_datetime_now(target, dt): - class DatetimeSubclassMeta(type): - @classmethod - def __instancecheck__(mcs, obj): - return isinstance(obj, real_datetime_class) - - class BaseMockedDatetime(real_datetime_class): - @classmethod - def now(cls, tz=None): - return target.replace(tzinfo=tz) - - @classmethod - def utcnow(cls): - return target - - # Python2 & Python3 compatible metaclass - MockedDatetime = DatetimeSubclassMeta('datetime', (BaseMockedDatetime,), {}) - - return patch.object(dt, 'datetime', MockedDatetime) - class TestLocalClient(unittest.TestCase): @@ -81,8 +59,7 @@ class TestLocalClient(unittest.TestCase): with self.assertRaises(AirflowException): client.trigger_dag(dag_id="blablabla") - import airflow.api.common.experimental.trigger_dag - with mock_datetime_now(EXECDATE, airflow.api.common.experimental.trigger_dag.datetime): + with freeze_time(EXECDATE): # no execution date, execution date should be set automatically client.trigger_dag(dag_id="test_start_date_scheduling") mock.assert_called_once_with(run_id="manual__{0}".format(EXECDATE_ISO), http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/contrib/operators/test_fs_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_fs_operator.py b/tests/contrib/operators/test_fs_operator.py index f990157..2ef4286 100644 --- a/tests/contrib/operators/test_fs_operator.py +++ b/tests/contrib/operators/test_fs_operator.py @@ -14,12 +14,12 @@ # import unittest -from datetime import datetime from airflow import configuration from airflow.settings import Session from airflow import models, DAG from airflow.contrib.operators.fs_operator import FileSensor +from airflow.utils.timezone import datetime TEST_DAG_ID = 'unit_tests' DEFAULT_DATE = datetime(2015, 1, 1) @@ -33,8 +33,10 @@ def reset(dag_id=TEST_DAG_ID): session.commit() session.close() + reset() + class FileSensorTest(unittest.TestCase): def setUp(self): configuration.load_test_config() @@ -60,5 +62,6 @@ class FileSensorTest(unittest.TestCase): ) task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + if __name__ == '__main__': unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/contrib/operators/test_jira_operator_test.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_jira_operator_test.py b/tests/contrib/operators/test_jira_operator_test.py index 6d615df..566cca4 100644 --- a/tests/contrib/operators/test_jira_operator_test.py +++ b/tests/contrib/operators/test_jira_operator_test.py @@ -14,7 +14,7 @@ # import unittest -import datetime + from mock import Mock from mock import patch @@ -22,8 +22,9 @@ from airflow import DAG, configuration from airflow.contrib.operators.jira_operator import JiraOperator from airflow import models from airflow.utils import db +from airflow.utils import timezone -DEFAULT_DATE = datetime.datetime(2017, 1, 1) +DEFAULT_DATE = timezone.datetime(2017, 1, 1) jira_client_mock = Mock( name="jira_client_for_test" ) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/contrib/operators/test_sftp_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_sftp_operator.py b/tests/contrib/operators/test_sftp_operator.py index 39e8d88..81e0c9e 100644 --- a/tests/contrib/operators/test_sftp_operator.py +++ b/tests/contrib/operators/test_sftp_operator.py @@ -15,7 +15,6 @@ import os import unittest from base64 import b64encode -from datetime import datetime from airflow import configuration from airflow import models @@ -23,6 +22,8 @@ from airflow.contrib.operators.sftp_operator import SFTPOperator, SFTPOperation from airflow.contrib.operators.ssh_operator import SSHOperator from airflow.models import DAG, TaskInstance from airflow.settings import Session +from airflow.utils import timezone +from airflow.utils.timezone import datetime TEST_DAG_ID = 'unit_tests' DEFAULT_DATE = datetime(2017, 1, 1) @@ -80,7 +81,7 @@ class SFTPOperatorTest(unittest.TestCase): dag=self.dag ) self.assertIsNotNone(put_test_task) - ti2 = TaskInstance(task=put_test_task, execution_date=datetime.now()) + ti2 = TaskInstance(task=put_test_task, execution_date=timezone.utcnow()) ti2.run() # check the remote file content @@ -92,7 +93,7 @@ class SFTPOperatorTest(unittest.TestCase): dag=self.dag ) self.assertIsNotNone(check_file_task) - ti3 = TaskInstance(task=check_file_task, execution_date=datetime.now()) + ti3 = TaskInstance(task=check_file_task, execution_date=timezone.utcnow()) ti3.run() self.assertEqual( ti3.xcom_pull(task_ids='test_check_file', key='return_value').strip(), @@ -117,7 +118,7 @@ class SFTPOperatorTest(unittest.TestCase): dag=self.dag ) self.assertIsNotNone(put_test_task) - ti2 = TaskInstance(task=put_test_task, execution_date=datetime.now()) + ti2 = TaskInstance(task=put_test_task, execution_date=timezone.utcnow()) ti2.run() # check the remote file content @@ -129,7 +130,7 @@ class SFTPOperatorTest(unittest.TestCase): dag=self.dag ) self.assertIsNotNone(check_file_task) - ti3 = TaskInstance(task=check_file_task, execution_date=datetime.now()) + ti3 = TaskInstance(task=check_file_task, execution_date=timezone.utcnow()) ti3.run() self.assertEqual( ti3.xcom_pull(task_ids='test_check_file', key='return_value').strip(), @@ -152,7 +153,7 @@ class SFTPOperatorTest(unittest.TestCase): dag=self.dag ) self.assertIsNotNone(create_file_task) - ti1 = TaskInstance(task=create_file_task, execution_date=datetime.now()) + ti1 = TaskInstance(task=create_file_task, execution_date=timezone.utcnow()) ti1.run() # get remote file to local @@ -165,7 +166,7 @@ class SFTPOperatorTest(unittest.TestCase): dag=self.dag ) self.assertIsNotNone(get_test_task) - ti2 = TaskInstance(task=get_test_task, execution_date=datetime.now()) + ti2 = TaskInstance(task=get_test_task, execution_date=timezone.utcnow()) ti2.run() # test the received content @@ -190,7 +191,7 @@ class SFTPOperatorTest(unittest.TestCase): dag=self.dag ) self.assertIsNotNone(create_file_task) - ti1 = TaskInstance(task=create_file_task, execution_date=datetime.now()) + ti1 = TaskInstance(task=create_file_task, execution_date=timezone.utcnow()) ti1.run() # get remote file to local @@ -203,7 +204,7 @@ class SFTPOperatorTest(unittest.TestCase): dag=self.dag ) self.assertIsNotNone(get_test_task) - ti2 = TaskInstance(task=get_test_task, execution_date=datetime.now()) + ti2 = TaskInstance(task=get_test_task, execution_date=timezone.utcnow()) ti2.run() # test the received content @@ -227,7 +228,7 @@ class SFTPOperatorTest(unittest.TestCase): dag=self.dag ) self.assertIsNotNone(remove_file_task) - ti3 = TaskInstance(task=remove_file_task, execution_date=datetime.now()) + ti3 = TaskInstance(task=remove_file_task, execution_date=timezone.utcnow()) ti3.run() def tearDown(self): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/contrib/operators/test_spark_submit_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_spark_submit_operator.py b/tests/contrib/operators/test_spark_submit_operator.py index 0731da9..4e72eea 100644 --- a/tests/contrib/operators/test_spark_submit_operator.py +++ b/tests/contrib/operators/test_spark_submit_operator.py @@ -14,15 +14,17 @@ # import unittest -import datetime import sys from airflow import DAG, configuration from airflow.models import TaskInstance from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator +from airflow.utils import timezone -DEFAULT_DATE = datetime.datetime(2017, 1, 1) +from datetime import timedelta + +DEFAULT_DATE = timezone.datetime(2017, 1, 1) class TestSparkSubmitOperator(unittest.TestCase): @@ -146,7 +148,7 @@ class TestSparkSubmitOperator(unittest.TestCase): # Then expected_application_args = [u'-f', 'foo', u'--bar', 'bar', - u'--start', (DEFAULT_DATE - datetime.timedelta(days=1)).strftime("%Y-%m-%d"), + u'--start', (DEFAULT_DATE - timedelta(days=1)).strftime("%Y-%m-%d"), u'--end', DEFAULT_DATE.strftime("%Y-%m-%d"), u'--with-spaces', u'args should keep embdedded spaces', ] @@ -154,5 +156,6 @@ class TestSparkSubmitOperator(unittest.TestCase): self.assertListEqual(expected_application_args, getattr(operator, '_application_args')) self.assertEqual(expected_name, getattr(operator, '_name')) + if __name__ == '__main__': unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/contrib/operators/test_ssh_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_ssh_operator.py b/tests/contrib/operators/test_ssh_operator.py index 019dfe4..4cec913 100644 --- a/tests/contrib/operators/test_ssh_operator.py +++ b/tests/contrib/operators/test_ssh_operator.py @@ -14,13 +14,14 @@ import unittest from base64 import b64encode -from datetime import datetime from airflow import configuration from airflow import models from airflow.contrib.operators.ssh_operator import SSHOperator from airflow.models import DAG, TaskInstance from airflow.settings import Session +from airflow.utils import timezone +from airflow.utils.timezone import datetime TEST_DAG_ID = 'unit_tests' DEFAULT_DATE = datetime(2017, 1, 1) @@ -65,7 +66,7 @@ class SSHOperatorTest(unittest.TestCase): self.assertIsNotNone(task) ti = TaskInstance( - task=task, execution_date=datetime.now()) + task=task, execution_date=timezone.utcnow()) ti.run() self.assertIsNotNone(ti.duration) self.assertEqual(ti.xcom_pull(task_ids='test', key='return_value'), @@ -84,7 +85,7 @@ class SSHOperatorTest(unittest.TestCase): self.assertIsNotNone(task) ti = TaskInstance( - task=task, execution_date=datetime.now()) + task=task, execution_date=timezone.utcnow()) ti.run() self.assertIsNotNone(ti.duration) self.assertEqual(ti.xcom_pull(task_ids='test', key='return_value'), b'airflow') @@ -102,7 +103,7 @@ class SSHOperatorTest(unittest.TestCase): self.assertIsNotNone(task) ti = TaskInstance( - task=task, execution_date=datetime.now()) + task=task, execution_date=timezone.utcnow()) ti.run() self.assertIsNotNone(ti.duration) self.assertEqual(ti.xcom_pull(task_ids='test', key='return_value'), b'airflow') @@ -120,10 +121,11 @@ class SSHOperatorTest(unittest.TestCase): self.assertIsNotNone(task) ti = TaskInstance( - task=task, execution_date=datetime.now()) + task=task, execution_date=timezone.utcnow()) ti.run() self.assertIsNotNone(ti.duration) self.assertEqual(ti.xcom_pull(task_ids='test', key='return_value'), b'') + if __name__ == '__main__': unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/contrib/sensors/test_jira_sensor_test.py ---------------------------------------------------------------------- diff --git a/tests/contrib/sensors/test_jira_sensor_test.py b/tests/contrib/sensors/test_jira_sensor_test.py index 77ca97f..7c16188 100644 --- a/tests/contrib/sensors/test_jira_sensor_test.py +++ b/tests/contrib/sensors/test_jira_sensor_test.py @@ -14,16 +14,16 @@ # import unittest -import datetime + from mock import Mock from mock import patch from airflow import DAG, configuration from airflow.contrib.sensors.jira_sensor import JiraTicketSensor from airflow import models -from airflow.utils import db +from airflow.utils import db, timezone -DEFAULT_DATE = datetime.datetime(2017, 1, 1) +DEFAULT_DATE = timezone.datetime(2017, 1, 1) jira_client_mock = Mock( name="jira_client_for_test" ) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/contrib/sensors/test_redis_sensor.py ---------------------------------------------------------------------- diff --git a/tests/contrib/sensors/test_redis_sensor.py b/tests/contrib/sensors/test_redis_sensor.py index 8022a92..d627501 100644 --- a/tests/contrib/sensors/test_redis_sensor.py +++ b/tests/contrib/sensors/test_redis_sensor.py @@ -14,15 +14,15 @@ import unittest -import datetime from mock import patch from airflow import DAG from airflow import configuration from airflow.contrib.sensors.redis_key_sensor import RedisKeySensor +from airflow.utils import timezone -DEFAULT_DATE = datetime.datetime(2017, 1, 1) +DEFAULT_DATE = timezone.datetime(2017, 1, 1) class TestRedisSensor(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/core.py ---------------------------------------------------------------------- diff --git a/tests/core.py b/tests/core.py index 0bd0c87..a57f0ed 100644 --- a/tests/core.py +++ b/tests/core.py @@ -25,9 +25,10 @@ import multiprocessing import mock from numpy.testing import assert_array_almost_equal import tempfile -from datetime import datetime, time, timedelta +from datetime import time, timedelta from email.mime.multipart import MIMEMultipart from email.mime.application import MIMEApplication +from freezegun import freeze_time import signal from six.moves.urllib.parse import urlencode from time import sleep @@ -39,7 +40,6 @@ import sqlalchemy from airflow import configuration from airflow.executors import SequentialExecutor from airflow.models import Variable -from tests.test_utils.fake_datetime import FakeDatetime configuration.load_test_config() from airflow import jobs, models, DAG, utils, macros, settings, exceptions @@ -56,6 +56,8 @@ from airflow.hooks.sqlite_hook import SqliteHook from airflow.bin import cli from airflow.www import app as application from airflow.settings import Session +from airflow.utils import timezone +from airflow.utils.timezone import datetime from airflow.utils.state import State from airflow.utils.dates import infer_time_unit, round_time, scale_time_units from lxml import html @@ -208,7 +210,7 @@ class CoreTest(unittest.TestCase): owner='Also fake', start_date=datetime(2015, 1, 2, 0, 0))) - start_date = datetime.utcnow() + start_date = timezone.utcnow() run = dag.create_dagrun( run_id='test_' + start_date.isoformat(), @@ -254,7 +256,7 @@ class CoreTest(unittest.TestCase): self.assertIsNone(additional_dag_run) - @mock.patch('airflow.jobs.datetime', FakeDatetime) + @freeze_time('2016-01-01') def test_schedule_dag_no_end_date_up_to_today_only(self): """ Tests that a Dag created without an end_date can only be scheduled up @@ -264,9 +266,6 @@ class CoreTest(unittest.TestCase): start_date of 2015-01-01, only jobs up to, but not including 2016-01-01 should be scheduled. """ - from datetime import datetime - FakeDatetime.utcnow = classmethod(lambda cls: datetime(2016, 1, 1)) - session = settings.Session() delta = timedelta(days=1) start_date = DEFAULT_DATE @@ -332,7 +331,8 @@ class CoreTest(unittest.TestCase): self.assertNotEqual(dag_subclass, self.dag) # a dag should equal an unpickled version of itself - self.assertEqual(pickle.loads(pickle.dumps(self.dag)), self.dag) + d = pickle.dumps(self.dag) + self.assertEqual(pickle.loads(d), self.dag) # dags are ordered based on dag_id no matter what the type is self.assertLess(self.dag, dag_diff_name) @@ -1637,7 +1637,7 @@ class SecurityTests(unittest.TestCase): def tearDown(self): configuration.conf.set("webserver", "expose_config", "False") - self.dag_bash.clear(start_date=DEFAULT_DATE, end_date=datetime.utcnow()) + self.dag_bash.clear(start_date=DEFAULT_DATE, end_date=timezone.utcnow()) class WebUiTests(unittest.TestCase): def setUp(self): @@ -1657,23 +1657,23 @@ class WebUiTests(unittest.TestCase): self.example_xcom = self.dagbag.dags['example_xcom'] self.dagrun_bash2 = self.dag_bash2.create_dagrun( - run_id="test_{}".format(models.DagRun.id_for_date(datetime.utcnow())), + run_id="test_{}".format(models.DagRun.id_for_date(timezone.utcnow())), execution_date=DEFAULT_DATE, - start_date=datetime.utcnow(), + start_date=timezone.utcnow(), state=State.RUNNING ) self.sub_dag.create_dagrun( - run_id="test_{}".format(models.DagRun.id_for_date(datetime.utcnow())), + run_id="test_{}".format(models.DagRun.id_for_date(timezone.utcnow())), execution_date=DEFAULT_DATE, - start_date=datetime.utcnow(), + start_date=timezone.utcnow(), state=State.RUNNING ) self.example_xcom.create_dagrun( - run_id="test_{}".format(models.DagRun.id_for_date(datetime.utcnow())), + run_id="test_{}".format(models.DagRun.id_for_date(timezone.utcnow())), execution_date=DEFAULT_DATE, - start_date=datetime.utcnow(), + start_date=timezone.utcnow(), state=State.RUNNING ) @@ -1849,7 +1849,7 @@ class WebUiTests(unittest.TestCase): def tearDown(self): configuration.conf.set("webserver", "expose_config", "False") - self.dag_bash.clear(start_date=DEFAULT_DATE, end_date=datetime.utcnow()) + self.dag_bash.clear(start_date=DEFAULT_DATE, end_date=timezone.utcnow()) session = Session() session.query(models.DagRun).delete() session.query(models.TaskInstance).delete() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/dags/test_cli_triggered_dags.py ---------------------------------------------------------------------- diff --git a/tests/dags/test_cli_triggered_dags.py b/tests/dags/test_cli_triggered_dags.py index 5af8fc8..94afe0e 100644 --- a/tests/dags/test_cli_triggered_dags.py +++ b/tests/dags/test_cli_triggered_dags.py @@ -13,9 +13,11 @@ # limitations under the License. -from datetime import datetime, timedelta +from datetime import timedelta + from airflow.models import DAG from airflow.operators.python_operator import PythonOperator +from airflow.utils.timezone import datetime DEFAULT_DATE = datetime(2016, 1, 1) default_args = dict( http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/executors/dask_executor.py ---------------------------------------------------------------------- diff --git a/tests/executors/dask_executor.py b/tests/executors/dask_executor.py index f66a272..decd663 100644 --- a/tests/executors/dask_executor.py +++ b/tests/executors/dask_executor.py @@ -12,15 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -import datetime import logging -import time import unittest from airflow import configuration from airflow.models import DAG, DagBag, TaskInstance, State from airflow.jobs import BackfillJob -from airflow.operators.python_operator import PythonOperator +from airflow.utils import timezone + +from datetime import timedelta try: from airflow.executors.dask_executor import DaskExecutor @@ -34,7 +34,7 @@ if 'sqlite' in configuration.get('core', 'sql_alchemy_conn'): logging.error('sqlite does not support concurrent access') SKIP_DASK = True -DEFAULT_DATE = datetime.datetime(2017, 1, 1) +DEFAULT_DATE = timezone.datetime(2017, 1, 1) class DaskExecutorTest(unittest.TestCase): @@ -63,9 +63,9 @@ class DaskExecutorTest(unittest.TestCase): k for k, v in executor.futures.items() if v == 'fail') # wait for the futures to execute, with a timeout - timeout = datetime.datetime.now() + datetime.timedelta(seconds=30) + timeout = timezone.utcnow() + timedelta(seconds=30) while not (success_future.done() and fail_future.done()): - if datetime.datetime.now() > timeout: + if timezone.utcnow() > timeout: raise ValueError( 'The futures should have finished; there is probably ' 'an error communciating with the Dask cluster.') @@ -80,7 +80,6 @@ class DaskExecutorTest(unittest.TestCase): cluster.close() - @unittest.skipIf(SKIP_DASK, 'Dask unsupported by this configuration') def test_backfill_integration(self): """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/impersonation.py ---------------------------------------------------------------------- diff --git a/tests/impersonation.py b/tests/impersonation.py index 0777def..5355c9a 100644 --- a/tests/impersonation.py +++ b/tests/impersonation.py @@ -19,7 +19,7 @@ import unittest from airflow import jobs, models from airflow.utils.state import State -from datetime import datetime +from airflow.utils.timezone import datetime DEV_NULL = '/dev/null' TEST_DAG_FOLDER = os.path.join( http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index 119e1b4..ca2db2c 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -37,6 +37,7 @@ from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance as from airflow.operators.dummy_operator import DummyOperator from airflow.operators.bash_operator import BashOperator from airflow.task_runner.base_task_runner import BaseTaskRunner +from airflow.utils import timezone from airflow.utils.dates import days_ago from airflow.utils.db import provide_session from airflow.utils.state import State @@ -63,7 +64,7 @@ except ImportError: mock = None DEV_NULL = '/dev/null' -DEFAULT_DATE = datetime.datetime(2016, 1, 1) +DEFAULT_DATE = timezone.datetime(2016, 1, 1) # Include the words "airflow" and "dag" in the file contents, tricking airflow into thinking these # files contain a DAG (otherwise Airflow will skip them) @@ -659,7 +660,7 @@ class BackfillJobTest(unittest.TestCase): subdag = subdag_op_task.subdag subdag.schedule_interval = '@daily' - start_date = datetime.datetime.now() + start_date = timezone.utcnow() executor = TestExecutor(do_update=True) job = BackfillJob(dag=subdag, start_date=start_date, @@ -1838,7 +1839,7 @@ class SchedulerJobTest(unittest.TestCase): """ dag = DAG( 'test_scheduler_dagrun_once', - start_date=datetime.datetime(2015, 1, 1), + start_date=timezone.datetime(2015, 1, 1), schedule_interval="@once") scheduler = SchedulerJob() @@ -1912,7 +1913,7 @@ class SchedulerJobTest(unittest.TestCase): def test_scheduler_do_not_schedule_too_early(self): dag = DAG( dag_id='test_scheduler_do_not_schedule_too_early', - start_date=datetime.datetime(2200, 1, 1)) + start_date=timezone.datetime(2200, 1, 1)) dag_task1 = DummyOperator( task_id='dummy', dag=dag, @@ -2059,7 +2060,7 @@ class SchedulerJobTest(unittest.TestCase): dr = scheduler.create_dag_run(dag) self.assertIsNotNone(dr) - dr.start_date = datetime.datetime.now() - datetime.timedelta(days=1) + dr.start_date = timezone.utcnow() - datetime.timedelta(days=1) session.merge(dr) session.commit() @@ -2102,7 +2103,7 @@ class SchedulerJobTest(unittest.TestCase): self.assertIsNone(new_dr) # Should be scheduled as dagrun_timeout has passed - dr.start_date = datetime.datetime.now() - datetime.timedelta(days=1) + dr.start_date = timezone.utcnow() - datetime.timedelta(days=1) session.merge(dr) session.commit() new_dr = scheduler.create_dag_run(dag) @@ -2213,7 +2214,7 @@ class SchedulerJobTest(unittest.TestCase): """ dag = DAG( dag_id='test_scheduler_auto_align_1', - start_date=datetime.datetime(2016, 1, 1, 10, 10, 0), + start_date=timezone.datetime(2016, 1, 1, 10, 10, 0), schedule_interval="4 5 * * *" ) dag_task1 = DummyOperator( @@ -2231,11 +2232,11 @@ class SchedulerJobTest(unittest.TestCase): dr = scheduler.create_dag_run(dag) self.assertIsNotNone(dr) - self.assertEquals(dr.execution_date, datetime.datetime(2016, 1, 2, 5, 4)) + self.assertEquals(dr.execution_date, timezone.datetime(2016, 1, 2, 5, 4)) dag = DAG( dag_id='test_scheduler_auto_align_2', - start_date=datetime.datetime(2016, 1, 1, 10, 10, 0), + start_date=timezone.datetime(2016, 1, 1, 10, 10, 0), schedule_interval="10 10 * * *" ) dag_task1 = DummyOperator( @@ -2253,7 +2254,7 @@ class SchedulerJobTest(unittest.TestCase): dr = scheduler.create_dag_run(dag) self.assertIsNotNone(dr) - self.assertEquals(dr.execution_date, datetime.datetime(2016, 1, 1, 10, 10)) + self.assertEquals(dr.execution_date, timezone.datetime(2016, 1, 1, 10, 10)) def test_scheduler_reschedule(self): """ @@ -2458,12 +2459,12 @@ class SchedulerJobTest(unittest.TestCase): self.assertTrue(dag.start_date > DEFAULT_DATE) expected_run_duration = 5 - start_time = datetime.datetime.now() + start_time = timezone.utcnow() scheduler = SchedulerJob(dag_id, run_duration=expected_run_duration, **self.default_scheduler_args) scheduler.run() - end_time = datetime.datetime.now() + end_time = timezone.utcnow() run_duration = (end_time - start_time).total_seconds() logging.info("Test ran in %.2fs, expected %.2fs", @@ -2503,7 +2504,7 @@ class SchedulerJobTest(unittest.TestCase): Test to check that a DAG returns it's active runs """ - now = datetime.datetime.now() + now = timezone.utcnow() six_hours_ago_to_the_hour = (now - datetime.timedelta(hours=6)).replace(minute=0, second=0, microsecond=0) START_DATE = six_hours_ago_to_the_hour @@ -2557,7 +2558,7 @@ class SchedulerJobTest(unittest.TestCase): Test to check that a DAG with catchup = False only schedules beginning now, not back to the start date """ - now = datetime.datetime.now() + now = timezone.utcnow() six_hours_ago_to_the_hour = (now - datetime.timedelta(hours=6)).replace(minute=0, second=0, microsecond=0) three_minutes_ago = now - datetime.timedelta(minutes=3) two_hours_and_three_minutes_ago = three_minutes_ago - datetime.timedelta(hours=2) @@ -2618,7 +2619,7 @@ class SchedulerJobTest(unittest.TestCase): self.assertGreater(dr.execution_date, three_minutes_ago) # The DR should be scheduled BEFORE now - self.assertLess(dr.execution_date, datetime.datetime.now()) + self.assertLess(dr.execution_date, timezone.utcnow()) dag3 = DAG(DAG_NAME3, schedule_interval='@hourly', @@ -2652,7 +2653,7 @@ class SchedulerJobTest(unittest.TestCase): self.assertGreater(dr.execution_date, two_hours_and_three_minutes_ago) # The DR should be scheduled BEFORE now - self.assertLess(dr.execution_date, datetime.datetime.now()) + self.assertLess(dr.execution_date, timezone.utcnow()) def test_add_unparseable_file_before_sched_start_creates_import_error(self): try: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/models.py ---------------------------------------------------------------------- diff --git a/tests/models.py b/tests/models.py index a1de17d..cabcf3a 100644 --- a/tests/models.py +++ b/tests/models.py @@ -20,6 +20,7 @@ from __future__ import unicode_literals import datetime import logging import os +import pendulum import unittest import time @@ -36,13 +37,14 @@ from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator from airflow.operators.python_operator import ShortCircuitOperator from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep +from airflow.utils import timezone from airflow.utils.state import State from airflow.utils.trigger_rule import TriggerRule from mock import patch from parameterized import parameterized -DEFAULT_DATE = datetime.datetime(2016, 1, 1) +DEFAULT_DATE = timezone.datetime(2016, 1, 1) TEST_DAGS_FOLDER = os.path.join( os.path.dirname(os.path.realpath(__file__)), 'dags') @@ -320,7 +322,7 @@ class DagStatTest(unittest.TestCase): with dag: op1 = DummyOperator(task_id='A') - now = datetime.datetime.now() + now = timezone.utcnow() dr = dag.create_dagrun( run_id='manual__' + now.isoformat(), execution_date=now, @@ -345,7 +347,7 @@ class DagStatTest(unittest.TestCase): class DagRunTest(unittest.TestCase): def create_dag_run(self, dag, state=State.RUNNING, task_states=None, execution_date=None): - now = datetime.datetime.now() + now = timezone.utcnow() if execution_date is None: execution_date = now dag_run = dag.create_dagrun( @@ -367,14 +369,14 @@ class DagRunTest(unittest.TestCase): def test_id_for_date(self): run_id = models.DagRun.id_for_date( - datetime.datetime(2015, 1, 2, 3, 4, 5, 6, None)) + timezone.datetime(2015, 1, 2, 3, 4, 5, 6)) self.assertEqual( 'scheduled__2015-01-02T03:04:05', run_id, 'Generated run_id did not match expectations: {0}'.format(run_id)) def test_dagrun_find(self): session = settings.Session() - now = datetime.datetime.now() + now = timezone.utcnow() dag_id1 = "test_dagrun_find_externally_triggered" dag_run = models.DagRun( @@ -411,7 +413,7 @@ class DagRunTest(unittest.TestCase): """ dag = DAG( dag_id='test_dagrun_success_when_all_skipped', - start_date=datetime.datetime(2017, 1, 1) + start_date=timezone.datetime(2017, 1, 1) ) dag_task1 = ShortCircuitOperator( task_id='test_short_circuit_false', @@ -459,7 +461,7 @@ class DagRunTest(unittest.TestCase): dag.clear() - now = datetime.datetime.now() + now = timezone.utcnow() dr = dag.create_dagrun(run_id='test_dagrun_success_conditions', state=State.RUNNING, execution_date=now, @@ -498,7 +500,7 @@ class DagRunTest(unittest.TestCase): op2.set_upstream(op1) dag.clear() - now = datetime.datetime.now() + now = timezone.utcnow() dr = dag.create_dagrun(run_id='test_dagrun_deadlock', state=State.RUNNING, execution_date=now, @@ -556,7 +558,7 @@ class DagRunTest(unittest.TestCase): """ dag = DAG( dag_id='test_get_task_instance_on_empty_dagrun', - start_date=datetime.datetime(2017, 1, 1) + start_date=timezone.datetime(2017, 1, 1) ) dag_task1 = ShortCircuitOperator( task_id='test_short_circuit_false', @@ -565,7 +567,7 @@ class DagRunTest(unittest.TestCase): session = settings.Session() - now = datetime.datetime.now() + now = timezone.utcnow() # Don't use create_dagrun since it will create the task instances too which we # don't want @@ -589,14 +591,14 @@ class DagRunTest(unittest.TestCase): dag_id='test_latest_runs_1', start_date=DEFAULT_DATE) dag_1_run_1 = self.create_dag_run(dag, - execution_date=datetime.datetime(2015, 1, 1)) + execution_date=timezone.datetime(2015, 1, 1)) dag_1_run_2 = self.create_dag_run(dag, - execution_date=datetime.datetime(2015, 1, 2)) + execution_date=timezone.datetime(2015, 1, 2)) dagruns = models.DagRun.get_latest_runs(session) session.close() for dagrun in dagruns: if dagrun.dag_id == 'test_latest_runs_1': - self.assertEqual(dagrun.execution_date, datetime.datetime(2015, 1, 2)) + self.assertEqual(dagrun.execution_date, timezone.datetime(2015, 1, 2)) def test_is_backfill(self): dag = DAG(dag_id='test_is_backfill', start_date=DEFAULT_DATE) @@ -835,7 +837,7 @@ class TaskInstanceTest(unittest.TestCase): max_active_runs=1, concurrency=2) task = DummyOperator(task_id='test_requeue_over_concurrency_op', dag=dag) - ti = TI(task=task, execution_date=datetime.datetime.now()) + ti = TI(task=task, execution_date=timezone.utcnow()) ti.run() self.assertEqual(ti.state, models.State.NONE) @@ -852,9 +854,9 @@ class TaskInstanceTest(unittest.TestCase): dag = models.DAG(dag_id='test_run_pooling_task') task = DummyOperator(task_id='test_run_pooling_task_op', dag=dag, pool='test_run_pooling_task_pool', owner='airflow', - start_date=datetime.datetime(2016, 2, 1, 0, 0, 0)) + start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) ti = TI( - task=task, execution_date=datetime.datetime.now()) + task=task, execution_date=timezone.utcnow()) ti.run() self.assertEqual(ti.state, models.State.SUCCESS) @@ -873,9 +875,9 @@ class TaskInstanceTest(unittest.TestCase): dag=dag, pool='test_run_pooling_task_with_mark_success_pool', owner='airflow', - start_date=datetime.datetime(2016, 2, 1, 0, 0, 0)) + start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) ti = TI( - task=task, execution_date=datetime.datetime.now()) + task=task, execution_date=timezone.utcnow()) ti.run(mark_success=True) self.assertEqual(ti.state, models.State.SUCCESS) @@ -894,9 +896,9 @@ class TaskInstanceTest(unittest.TestCase): dag=dag, python_callable=raise_skip_exception, owner='airflow', - start_date=datetime.datetime(2016, 2, 1, 0, 0, 0)) + start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) ti = TI( - task=task, execution_date=datetime.datetime.now()) + task=task, execution_date=timezone.utcnow()) ti.run() self.assertEqual(models.State.SKIPPED, ti.state) @@ -912,7 +914,7 @@ class TaskInstanceTest(unittest.TestCase): retry_delay=datetime.timedelta(seconds=3), dag=dag, owner='airflow', - start_date=datetime.datetime(2016, 2, 1, 0, 0, 0)) + start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) def run_with_error(ti): try: @@ -921,7 +923,7 @@ class TaskInstanceTest(unittest.TestCase): pass ti = TI( - task=task, execution_date=datetime.datetime.now()) + task=task, execution_date=timezone.utcnow()) # first run -- up for retry run_with_error(ti) @@ -953,7 +955,7 @@ class TaskInstanceTest(unittest.TestCase): retry_delay=datetime.timedelta(seconds=0), dag=dag, owner='airflow', - start_date=datetime.datetime(2016, 2, 1, 0, 0, 0)) + start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) def run_with_error(ti): try: @@ -962,7 +964,7 @@ class TaskInstanceTest(unittest.TestCase): pass ti = TI( - task=task, execution_date=datetime.datetime.now()) + task=task, execution_date=timezone.utcnow()) # first run -- up for retry run_with_error(ti) @@ -1002,25 +1004,28 @@ class TaskInstanceTest(unittest.TestCase): max_retry_delay=max_delay, dag=dag, owner='airflow', - start_date=datetime.datetime(2016, 2, 1, 0, 0, 0)) + start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) ti = TI( task=task, execution_date=DEFAULT_DATE) - ti.end_date = datetime.datetime.now() + ti.end_date = pendulum.instance(timezone.utcnow()) ti.try_number = 1 dt = ti.next_retry_datetime() # between 30 * 2^0.5 and 30 * 2^1 (15 and 30) - self.assertEqual(dt, ti.end_date + datetime.timedelta(seconds=20.0)) + period = ti.end_date.add(seconds=30) - ti.end_date.add(seconds=15) + self.assertTrue(dt in period) ti.try_number = 4 dt = ti.next_retry_datetime() # between 30 * 2^2 and 30 * 2^3 (120 and 240) - self.assertEqual(dt, ti.end_date + datetime.timedelta(seconds=181.0)) + period = ti.end_date.add(seconds=240) - ti.end_date.add(seconds=120) + self.assertTrue(dt in period) ti.try_number = 6 dt = ti.next_retry_datetime() # between 30 * 2^4 and 30 * 2^5 (480 and 960) - self.assertEqual(dt, ti.end_date + datetime.timedelta(seconds=825.0)) + period = ti.end_date.add(seconds=960) - ti.end_date.add(seconds=480) + self.assertTrue(dt in period) ti.try_number = 9 dt = ti.next_retry_datetime() @@ -1099,7 +1104,7 @@ class TaskInstanceTest(unittest.TestCase): failed, upstream_failed, done, flag_upstream_failed, expect_state, expect_completed): - start_date = datetime.datetime(2016, 2, 1, 0, 0, 0) + start_date = timezone.datetime(2016, 2, 1, 0, 0, 0) dag = models.DAG('test-dag', start_date=start_date) downstream = DummyOperator(task_id='downstream', dag=dag, owner='airflow', @@ -1137,8 +1142,8 @@ class TaskInstanceTest(unittest.TestCase): dag=dag, pool='test_xcom', owner='airflow', - start_date=datetime.datetime(2016, 6, 2, 0, 0, 0)) - exec_date = datetime.datetime.now() + start_date=timezone.datetime(2016, 6, 2, 0, 0, 0)) + exec_date = timezone.utcnow() ti = TI( task=task, execution_date=exec_date) ti.run(mark_success=True) @@ -1171,8 +1176,8 @@ class TaskInstanceTest(unittest.TestCase): dag=dag, pool='test_xcom', owner='airflow', - start_date=datetime.datetime(2016, 6, 2, 0, 0, 0)) - exec_date = datetime.datetime.now() + start_date=timezone.datetime(2016, 6, 2, 0, 0, 0)) + exec_date = timezone.utcnow() ti = TI( task=task, execution_date=exec_date) ti.run(mark_success=True) @@ -1213,8 +1218,8 @@ class TaskInstanceTest(unittest.TestCase): dag=dag, python_callable=lambda: 'error', owner='airflow', - start_date=datetime.datetime(2017, 2, 1)) - ti = TI(task=task, execution_date=datetime.datetime.now()) + start_date=timezone.datetime(2017, 2, 1)) + ti = TI(task=task, execution_date=timezone.utcnow()) with self.assertRaises(TestError): ti.run() @@ -1223,7 +1228,7 @@ class TaskInstanceTest(unittest.TestCase): dag = models.DAG(dag_id='test_check_and_change_state_before_execution') task = DummyOperator(task_id='task', dag=dag, start_date=DEFAULT_DATE) ti = TI( - task=task, execution_date=datetime.datetime.now()) + task=task, execution_date=timezone.utcnow()) self.assertTrue(ti._check_and_change_state_before_execution()) def test_check_and_change_state_before_execution_dep_not_met(self): @@ -1232,7 +1237,7 @@ class TaskInstanceTest(unittest.TestCase): task2= DummyOperator(task_id='task2', dag=dag, start_date=DEFAULT_DATE) task >> task2 ti = TI( - task=task2, execution_date=datetime.datetime.now()) + task=task2, execution_date=timezone.utcnow()) self.assertFalse(ti._check_and_change_state_before_execution()) def test_get_num_running_task_instances(self): @@ -1257,7 +1262,7 @@ class TaskInstanceTest(unittest.TestCase): self.assertEquals(1, ti1.get_num_running_task_instances(session=session)) self.assertEquals(1, ti2.get_num_running_task_instances(session=session)) self.assertEquals(1, ti3.get_num_running_task_instances(session=session)) - + class ClearTasksTest(unittest.TestCase): def test_clear_task_instances(self): @@ -1457,7 +1462,7 @@ class ClearTasksTest(unittest.TestCase): def test_xcom_disable_pickle_type(self): json_obj = {"key": "value"} - execution_date = datetime.datetime.now() + execution_date = timezone.utcnow() key = "xcom_test1" dag_id = "test_dag1" task_id = "test_task1" @@ -1479,7 +1484,7 @@ class ClearTasksTest(unittest.TestCase): def test_xcom_enable_pickle_type(self): json_obj = {"key": "value"} - execution_date = datetime.datetime.now() + execution_date = timezone.utcnow() key = "xcom_test2" dag_id = "test_dag2" task_id = "test_task2" @@ -1508,12 +1513,12 @@ class ClearTasksTest(unittest.TestCase): value=PickleRce(), dag_id="test_dag3", task_id="test_task3", - execution_date=datetime.datetime.now(), + execution_date=timezone.utcnow(), enable_pickling=False) def test_xcom_get_many(self): json_obj = {"key": "value"} - execution_date = datetime.datetime.now() + execution_date = timezone.utcnow() key = "xcom_test4" dag_id1 = "test_dag4" task_id1 = "test_task4" http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/operators/latest_only_operator.py ---------------------------------------------------------------------- diff --git a/tests/operators/latest_only_operator.py b/tests/operators/latest_only_operator.py index 225d24f..44fff23 100644 --- a/tests/operators/latest_only_operator.py +++ b/tests/operators/latest_only_operator.py @@ -23,13 +23,14 @@ from airflow.jobs import BackfillJob from airflow.models import TaskInstance from airflow.operators.latest_only_operator import LatestOnlyOperator from airflow.operators.dummy_operator import DummyOperator +from airflow.utils import timezone from airflow.utils.state import State from freezegun import freeze_time -DEFAULT_DATE = datetime.datetime(2016, 1, 1) -END_DATE = datetime.datetime(2016, 1, 2) +DEFAULT_DATE = timezone.datetime(2016, 1, 1) +END_DATE = timezone.datetime(2016, 1, 2) INTERVAL = datetime.timedelta(hours=12) -FROZEN_NOW = datetime.datetime(2016, 1, 2, 12, 1, 1) +FROZEN_NOW = timezone.datetime(2016, 1, 2, 12, 1, 1) def get_task_instances(task_id): @@ -85,27 +86,27 @@ class LatestOnlyOperatorTest(unittest.TestCase): exec_date_to_latest_state = { ti.execution_date: ti.state for ti in latest_instances} self.assertEqual({ - datetime.datetime(2016, 1, 1): 'success', - datetime.datetime(2016, 1, 1, 12): 'success', - datetime.datetime(2016, 1, 2): 'success', }, + timezone.datetime(2016, 1, 1): 'success', + timezone.datetime(2016, 1, 1, 12): 'success', + timezone.datetime(2016, 1, 2): 'success', }, exec_date_to_latest_state) downstream_instances = get_task_instances('downstream') exec_date_to_downstream_state = { ti.execution_date: ti.state for ti in downstream_instances} self.assertEqual({ - datetime.datetime(2016, 1, 1): 'skipped', - datetime.datetime(2016, 1, 1, 12): 'skipped', - datetime.datetime(2016, 1, 2): 'success',}, + timezone.datetime(2016, 1, 1): 'skipped', + timezone.datetime(2016, 1, 1, 12): 'skipped', + timezone.datetime(2016, 1, 2): 'success',}, exec_date_to_downstream_state) downstream_instances = get_task_instances('downstream_2') exec_date_to_downstream_state = { ti.execution_date: ti.state for ti in downstream_instances} self.assertEqual({ - datetime.datetime(2016, 1, 1): 'skipped', - datetime.datetime(2016, 1, 1, 12): 'skipped', - datetime.datetime(2016, 1, 2): 'success',}, + timezone.datetime(2016, 1, 1): 'skipped', + timezone.datetime(2016, 1, 1, 12): 'skipped', + timezone.datetime(2016, 1, 2): 'success',}, exec_date_to_downstream_state) def test_skipping_dagrun(self): @@ -124,21 +125,21 @@ class LatestOnlyOperatorTest(unittest.TestCase): dr1 = self.dag.create_dagrun( run_id="manual__1", - start_date=datetime.datetime.now(), + start_date=timezone.utcnow(), execution_date=DEFAULT_DATE, state=State.RUNNING ) dr2 = self.dag.create_dagrun( run_id="manual__2", - start_date=datetime.datetime.now(), - execution_date=datetime.datetime(2016, 1, 1, 12), + start_date=timezone.utcnow(), + execution_date=timezone.datetime(2016, 1, 1, 12), state=State.RUNNING ) dr2 = self.dag.create_dagrun( run_id="manual__3", - start_date=datetime.datetime.now(), + start_date=timezone.utcnow(), execution_date=END_DATE, state=State.RUNNING ) @@ -151,25 +152,25 @@ class LatestOnlyOperatorTest(unittest.TestCase): exec_date_to_latest_state = { ti.execution_date: ti.state for ti in latest_instances} self.assertEqual({ - datetime.datetime(2016, 1, 1): 'success', - datetime.datetime(2016, 1, 1, 12): 'success', - datetime.datetime(2016, 1, 2): 'success', }, + timezone.datetime(2016, 1, 1): 'success', + timezone.datetime(2016, 1, 1, 12): 'success', + timezone.datetime(2016, 1, 2): 'success', }, exec_date_to_latest_state) downstream_instances = get_task_instances('downstream') exec_date_to_downstream_state = { ti.execution_date: ti.state for ti in downstream_instances} self.assertEqual({ - datetime.datetime(2016, 1, 1): 'skipped', - datetime.datetime(2016, 1, 1, 12): 'skipped', - datetime.datetime(2016, 1, 2): 'success',}, + timezone.datetime(2016, 1, 1): 'skipped', + timezone.datetime(2016, 1, 1, 12): 'skipped', + timezone.datetime(2016, 1, 2): 'success',}, exec_date_to_downstream_state) downstream_instances = get_task_instances('downstream_2') exec_date_to_downstream_state = { ti.execution_date: ti.state for ti in downstream_instances} self.assertEqual({ - datetime.datetime(2016, 1, 1): 'skipped', - datetime.datetime(2016, 1, 1, 12): 'skipped', - datetime.datetime(2016, 1, 2): 'success',}, + timezone.datetime(2016, 1, 1): 'skipped', + timezone.datetime(2016, 1, 1, 12): 'skipped', + timezone.datetime(2016, 1, 2): 'success',}, exec_date_to_downstream_state) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/operators/operators.py ---------------------------------------------------------------------- diff --git a/tests/operators/operators.py b/tests/operators/operators.py index 0f5abd5..40f0ffd 100644 --- a/tests/operators/operators.py +++ b/tests/operators/operators.py @@ -14,16 +14,15 @@ from __future__ import print_function -import datetime - from airflow import DAG, configuration, operators from airflow.utils.tests import skipUnlessImported +from airflow.utils import timezone configuration.load_test_config() import unittest -DEFAULT_DATE = datetime.datetime(2015, 1, 1) +DEFAULT_DATE = timezone.datetime(2015, 1, 1) DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat() DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10] TEST_DAG_ID = 'unit_test_dag' @@ -251,7 +250,7 @@ class TransferTests(unittest.TestCase): def test_clear(self): self.dag.clear( start_date=DEFAULT_DATE, - end_date=datetime.datetime.now()) + end_date=timezone.utcnow()) def test_mysql_to_hive(self): # import airflow.operators http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/operators/python_operator.py ---------------------------------------------------------------------- diff --git a/tests/operators/python_operator.py b/tests/operators/python_operator.py index 74120fe..6fa5e5a 100644 --- a/tests/operators/python_operator.py +++ b/tests/operators/python_operator.py @@ -23,15 +23,16 @@ from airflow.operators.python_operator import PythonOperator, BranchPythonOperat from airflow.operators.python_operator import ShortCircuitOperator from airflow.operators.dummy_operator import DummyOperator from airflow.settings import Session +from airflow.utils import timezone from airflow.utils.state import State from airflow.exceptions import AirflowException import logging -DEFAULT_DATE = datetime.datetime(2016, 1, 1) -END_DATE = datetime.datetime(2016, 1, 2) +DEFAULT_DATE = timezone.datetime(2016, 1, 1) +END_DATE = timezone.datetime(2016, 1, 2) INTERVAL = datetime.timedelta(hours=12) -FROZEN_NOW = datetime.datetime(2016, 1, 2, 12, 1, 1) +FROZEN_NOW = timezone.datetime(2016, 1, 2, 12, 1, 1) class PythonOperatorTest(unittest.TestCase): @@ -127,7 +128,7 @@ class BranchOperatorTest(unittest.TestCase): def test_with_dag_run(self): dr = self.dag.create_dagrun( run_id="manual__", - start_date=datetime.datetime.now(), + start_date=timezone.utcnow(), execution_date=DEFAULT_DATE, state=State.RUNNING ) @@ -225,7 +226,7 @@ class ShortCircuitOperatorTest(unittest.TestCase): logging.error("Tasks {}".format(dag.tasks)) dr = dag.create_dagrun( run_id="manual__", - start_date=datetime.datetime.now(), + start_date=timezone.utcnow(), execution_date=DEFAULT_DATE, state=State.RUNNING ) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/operators/sensors.py ---------------------------------------------------------------------- diff --git a/tests/operators/sensors.py b/tests/operators/sensors.py index ee67524..d09dabe 100644 --- a/tests/operators/sensors.py +++ b/tests/operators/sensors.py @@ -15,7 +15,7 @@ import logging import sys import time import unittest -from datetime import datetime, timedelta +from datetime import timedelta from mock import patch from airflow import DAG, configuration, settings @@ -28,6 +28,8 @@ from airflow.operators.dummy_operator import DummyOperator from airflow.operators.sensors import HttpSensor, BaseSensorOperator, HdfsSensor, ExternalTaskSensor from airflow.utils.decorators import apply_defaults from airflow.utils.state import State +from airflow.utils import timezone +from airflow.utils.timezone import datetime try: from unittest import mock @@ -64,12 +66,12 @@ class TimeoutTestSensor(BaseSensorOperator): return self.return_value def execute(self, context): - started_at = datetime.now() + started_at = timezone.utcnow() time_jump = self.params.get('time_jump') while not self.poke(context): if time_jump: started_at -= time_jump - if (datetime.now() - started_at).total_seconds() > self.timeout: + if (timezone.utcnow() - started_at).total_seconds() > self.timeout: if self.soft_fail: raise AirflowSkipException('Snap. Time is OUT.') else: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/operators/subdag_operator.py ---------------------------------------------------------------------- diff --git a/tests/operators/subdag_operator.py b/tests/operators/subdag_operator.py index 9224f63..026eb3c 100644 --- a/tests/operators/subdag_operator.py +++ b/tests/operators/subdag_operator.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import datetime import os import unittest @@ -25,14 +24,16 @@ from airflow.operators.dummy_operator import DummyOperator from airflow.operators.subdag_operator import SubDagOperator from airflow.jobs import BackfillJob from airflow.exceptions import AirflowException +from airflow.utils.timezone import datetime -DEFAULT_DATE = datetime.datetime(2016, 1, 1) +DEFAULT_DATE = datetime(2016, 1, 1) default_args = dict( owner='airflow', start_date=DEFAULT_DATE, ) + class SubDagOperatorTests(unittest.TestCase): def test_subdag_name(self): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/operators/test_virtualenv_operator.py ---------------------------------------------------------------------- diff --git a/tests/operators/test_virtualenv_operator.py b/tests/operators/test_virtualenv_operator.py index fdd2742..03623a6 100644 --- a/tests/operators/test_virtualenv_operator.py +++ b/tests/operators/test_virtualenv_operator.py @@ -15,6 +15,7 @@ from __future__ import print_function, unicode_literals import datetime + import funcsigs import sys import unittest @@ -25,15 +26,15 @@ from airflow import configuration, DAG from airflow.models import TaskInstance from airflow.operators.python_operator import PythonVirtualenvOperator from airflow.settings import Session -from airflow.utils.state import State +from airflow.utils import timezone from airflow.exceptions import AirflowException import logging -DEFAULT_DATE = datetime.datetime(2016, 1, 1) -END_DATE = datetime.datetime(2016, 1, 2) +DEFAULT_DATE = timezone.datetime(2016, 1, 1) +END_DATE = timezone.datetime(2016, 1, 2) INTERVAL = datetime.timedelta(hours=12) -FROZEN_NOW = datetime.datetime(2016, 1, 2, 12, 1, 1) +FROZEN_NOW = timezone.datetime(2016, 1, 2, 12, 1, 1) class TestPythonVirtualenvOperator(unittest.TestCase): @@ -185,7 +186,7 @@ class TestPythonVirtualenvOperator(unittest.TestCase): def test_nonimported_as_arg(self): def f(a): return None - self._run_as_operator(f, op_args=[datetime.datetime.now()]) + self._run_as_operator(f, op_args=[datetime.datetime.utcnow()]) def test_context(self): def f(**kwargs): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/ti_deps/deps/test_not_in_retry_period_dep.py ---------------------------------------------------------------------- diff --git a/tests/ti_deps/deps/test_not_in_retry_period_dep.py b/tests/ti_deps/deps/test_not_in_retry_period_dep.py index 0f23aab..38502fb 100644 --- a/tests/ti_deps/deps/test_not_in_retry_period_dep.py +++ b/tests/ti_deps/deps/test_not_in_retry_period_dep.py @@ -13,13 +13,14 @@ # limitations under the License. import unittest -from datetime import datetime, timedelta +from datetime import timedelta from freezegun import freeze_time from mock import Mock from airflow.models import TaskInstance from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep from airflow.utils.state import State +from airflow.utils.timezone import datetime class NotInRetryPeriodDepTest(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/ti_deps/deps/test_runnable_exec_date_dep.py ---------------------------------------------------------------------- diff --git a/tests/ti_deps/deps/test_runnable_exec_date_dep.py b/tests/ti_deps/deps/test_runnable_exec_date_dep.py index e1a396c..28b285f 100644 --- a/tests/ti_deps/deps/test_runnable_exec_date_dep.py +++ b/tests/ti_deps/deps/test_runnable_exec_date_dep.py @@ -13,13 +13,12 @@ # limitations under the License. import unittest -from datetime import datetime from freezegun import freeze_time from mock import Mock from airflow.models import TaskInstance from airflow.ti_deps.deps.runnable_exec_date_dep import RunnableExecDateDep - +from airflow.utils.timezone import datetime class RunnableExecDateDepTest(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/utils/log/test_file_processor_handler.py ---------------------------------------------------------------------- diff --git a/tests/utils/log/test_file_processor_handler.py b/tests/utils/log/test_file_processor_handler.py index defe623..8a3bbd2 100644 --- a/tests/utils/log/test_file_processor_handler.py +++ b/tests/utils/log/test_file_processor_handler.py @@ -17,7 +17,7 @@ import os import unittest from airflow.utils.log.file_processor_handler import FileProcessorHandler -from datetime import datetime +from airflow.utils import timezone from datetime import timedelta from freezegun import freeze_time @@ -31,7 +31,7 @@ class TestFileProcessorHandler(unittest.TestCase): self.dag_dir = "/dags" def test_non_template(self): - date = datetime.utcnow().strftime("%Y-%m-%d") + date = timezone.utcnow().strftime("%Y-%m-%d") handler = FileProcessorHandler(base_log_folder=self.base_log_folder, filename_template=self.filename) handler.dag_dir = self.dag_dir @@ -44,7 +44,7 @@ class TestFileProcessorHandler(unittest.TestCase): self.assertTrue(os.path.exists(os.path.join(path, "logfile"))) def test_template(self): - date = datetime.utcnow().strftime("%Y-%m-%d") + date = timezone.utcnow().strftime("%Y-%m-%d") handler = FileProcessorHandler(base_log_folder=self.base_log_folder, filename_template=self.filename_template) handler.dag_dir = self.dag_dir @@ -61,8 +61,8 @@ class TestFileProcessorHandler(unittest.TestCase): filename_template=self.filename) handler.dag_dir = self.dag_dir - date1 = (datetime.utcnow() + timedelta(days=1)).strftime("%Y-%m-%d") - date2 = (datetime.utcnow() + timedelta(days=2)).strftime("%Y-%m-%d") + date1 = (timezone.utcnow() + timedelta(days=1)).strftime("%Y-%m-%d") + date2 = (timezone.utcnow() + timedelta(days=2)).strftime("%Y-%m-%d") p1 = os.path.join(self.base_log_folder, date1, "log1") p2 = os.path.join(self.base_log_folder, date1, "log2") http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/utils/log/test_s3_task_handler.py ---------------------------------------------------------------------- diff --git a/tests/utils/log/test_s3_task_handler.py b/tests/utils/log/test_s3_task_handler.py index b1354cd..dc32b5a 100644 --- a/tests/utils/log/test_s3_task_handler.py +++ b/tests/utils/log/test_s3_task_handler.py @@ -12,12 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from datetime import datetime import mock import unittest from airflow import configuration from airflow.utils.log.s3_task_handler import S3TaskHandler +from airflow.utils.timezone import datetime from airflow.hooks.S3_hook import S3Hook from airflow.models import TaskInstance, DAG from airflow.operators.dummy_operator import DummyOperator http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/utils/test_dates.py ---------------------------------------------------------------------- diff --git a/tests/utils/test_dates.py b/tests/utils/test_dates.py index 50e76ba..199de4a 100644 --- a/tests/utils/test_dates.py +++ b/tests/utils/test_dates.py @@ -13,16 +13,18 @@ # limitations under the License. from datetime import datetime, timedelta +import pendulum import unittest from airflow.utils import dates +from airflow.utils import timezone class Dates(unittest.TestCase): def test_days_ago(self): - today = datetime.today() - today_midnight = datetime.fromordinal(today.date().toordinal()) + today = pendulum.today() + today_midnight = pendulum.instance(datetime.fromordinal(today.date().toordinal())) self.assertTrue(dates.days_ago(0) == today_midnight) @@ -43,9 +45,9 @@ class Dates(unittest.TestCase): def test_parse_execution_date(self): execution_date_str_wo_ms = '2017-11-02 00:00:00' - execution_date_str_w_ms = '2017-11-05 16:18:30..989729' - bad_execution_date_str = '2017-11-06T00:00:00Z' + execution_date_str_w_ms = '2017-11-05 16:18:30.989729' + bad_execution_date_str = '2017-11-06TXX:00:00Z' - self.assertEqual(datetime(2017, 11, 2, 0, 0, 0), dates.parse_execution_date(execution_date_str_wo_ms)) - self.assertEqual(datetime(2017, 11, 5, 16, 18, 30, 989729), dates.parse_execution_date(execution_date_str_w_ms)) + self.assertEqual(timezone.datetime(2017, 11, 2, 0, 0, 0), dates.parse_execution_date(execution_date_str_wo_ms)) + self.assertEqual(timezone.datetime(2017, 11, 5, 16, 18, 30, 989729), dates.parse_execution_date(execution_date_str_w_ms)) self.assertRaises(ValueError, dates.parse_execution_date, bad_execution_date_str) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/utils/test_log_handlers.py ---------------------------------------------------------------------- diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index 25faa7c..fd5006c 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -19,10 +19,10 @@ import mock import os import unittest -from datetime import datetime from airflow.models import TaskInstance, DAG from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG from airflow.operators.dummy_operator import DummyOperator +from airflow.utils.timezone import datetime from airflow.utils.log.file_task_handler import FileTaskHandler DEFAULT_DATE = datetime(2016, 1, 1) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/www/api/experimental/test_endpoints.py ---------------------------------------------------------------------- diff --git a/tests/www/api/experimental/test_endpoints.py b/tests/www/api/experimental/test_endpoints.py index 65a6f75..2c510a6 100644 --- a/tests/www/api/experimental/test_endpoints.py +++ b/tests/www/api/experimental/test_endpoints.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from datetime import datetime, timedelta +from datetime import timedelta import json import unittest from urllib.parse import quote_plus @@ -21,6 +21,7 @@ from airflow import configuration from airflow.api.common.experimental.trigger_dag import trigger_dag from airflow.models import DagBag, DagRun, Pool, TaskInstance from airflow.settings import Session +from airflow.utils.timezone import datetime, utcnow from airflow.www import app as application @@ -75,7 +76,7 @@ class TestApiExperimental(unittest.TestCase): url_template = '/api/experimental/dags/{}/dag_runs' response = self.app.post( url_template.format('example_bash_operator'), - data=json.dumps({'run_id': 'my_run' + datetime.now().isoformat()}), + data=json.dumps({'run_id': 'my_run' + utcnow().isoformat()}), content_type="application/json" ) @@ -91,7 +92,7 @@ class TestApiExperimental(unittest.TestCase): def test_trigger_dag_for_date(self): url_template = '/api/experimental/dags/{}/dag_runs' dag_id = 'example_bash_operator' - hour_from_now = datetime.now() + timedelta(hours=1) + hour_from_now = utcnow() + timedelta(hours=1) execution_date = datetime(hour_from_now.year, hour_from_now.month, hour_from_now.day, @@ -133,7 +134,7 @@ class TestApiExperimental(unittest.TestCase): url_template = '/api/experimental/dags/{}/dag_runs/{}/tasks/{}' dag_id = 'example_bash_operator' task_id = 'also_run_this' - execution_date = datetime.now().replace(microsecond=0) + execution_date = utcnow().replace(microsecond=0) datetime_string = quote_plus(execution_date.isoformat()) wrong_datetime_string = quote_plus( datetime(1990, 1, 1, 1, 1, 1).isoformat() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9624f5f2/tests/www/test_views.py ---------------------------------------------------------------------- diff --git a/tests/www/test_views.py b/tests/www/test_views.py index 4931487..a9bb28f 100644 --- a/tests/www/test_views.py +++ b/tests/www/test_views.py @@ -18,7 +18,6 @@ import os import shutil import tempfile import unittest -from datetime import datetime import sys from airflow import models, configuration, settings @@ -26,6 +25,7 @@ from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONF from airflow.models import DAG, TaskInstance from airflow.operators.dummy_operator import DummyOperator from airflow.settings import Session +from airflow.utils.timezone import datetime from airflow.www import app as application from airflow import configuration as conf
