ksolan closed pull request #4257: Ksolan/pendulum 2.x
URL: https://github.com/apache/incubator-airflow/pull/4257
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/settings.py b/airflow/settings.py
index 8f8420ea22..dd0380f519 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -213,15 +213,15 @@ def dispose_orm():
def configure_adapters():
- from pendulum import Pendulum
+ from pendulum import datetime
try:
from sqlite3 import register_adapter
- register_adapter(Pendulum, lambda val: val.isoformat(' '))
+ register_adapter(datetime, lambda val: val.isoformat(' '))
except ImportError:
pass
try:
import MySQLdb.converters
- MySQLdb.converters.conversions[Pendulum] =
MySQLdb.converters.DateTime2literal
+ MySQLdb.converters.conversions[datetime] =
MySQLdb.converters.DateTime2literal
except ImportError:
pass
diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py
index 5adaa2f5c4..1edddb73ec 100644
--- a/airflow/utils/timezone.py
+++ b/airflow/utils/timezone.py
@@ -169,4 +169,4 @@ def parse(string, timezone=None):
Parse a time string and return an aware datetime
:param string: time string
"""
- return pendulum.parse(string, tz=timezone or TIMEZONE)
+ return pendulum.parse(string, strict=False, tz=timezone or TIMEZONE)
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 8792191e21..ec674d7379 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -315,7 +315,7 @@ def get_chart_height(dag):
def get_date_time_num_runs_dag_runs_form_data(request, session, dag):
dttm = request.args.get('execution_date')
if dttm:
- dttm = pendulum.parse(dttm)
+ dttm = pendulum.parse(dttm, strict=False)
else:
dttm = dag.latest_execution_date or timezone.utcnow()
@@ -729,7 +729,7 @@ def rendered(self):
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
execution_date = request.args.get('execution_date')
- dttm = pendulum.parse(execution_date)
+ dttm = pendulum.parse(execution_date, strict=False)
form = DateTimeForm(data={'execution_date': dttm})
dag = dagbag.get_dag(dag_id)
task = copy.copy(dag.get_task(task_id))
@@ -765,7 +765,7 @@ def get_logs_with_metadata(self, session=None):
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
execution_date = request.args.get('execution_date')
- dttm = pendulum.parse(execution_date)
+ dttm = pendulum.parse(execution_date, strict=False)
try_number = int(request.args.get('try_number'))
metadata = request.args.get('metadata')
metadata = json.loads(metadata)
@@ -824,7 +824,7 @@ def log(self, session=None):
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
execution_date = request.args.get('execution_date')
- dttm = pendulum.parse(execution_date)
+ dttm = pendulum.parse(execution_date, strict=False)
form = DateTimeForm(data={'execution_date': dttm})
dag = dagbag.get_dag(dag_id)
@@ -851,7 +851,7 @@ def task(self):
# Carrying execution_date through, even though it's irrelevant for
# this context
execution_date = request.args.get('execution_date')
- dttm = pendulum.parse(execution_date)
+ dttm = pendulum.parse(execution_date, strict=False)
form = DateTimeForm(data={'execution_date': dttm})
dag = dagbag.get_dag(dag_id)
@@ -938,7 +938,7 @@ def xcom(self, session=None):
# Carrying execution_date through, even though it's irrelevant for
# this context
execution_date = request.args.get('execution_date')
- dttm = pendulum.parse(execution_date)
+ dttm = pendulum.parse(execution_date, strict=False)
form = DateTimeForm(data={'execution_date': dttm})
dag = dagbag.get_dag(dag_id)
if not dag or task_id not in dag.task_ids:
@@ -978,7 +978,7 @@ def run(self):
task = dag.get_task(task_id)
execution_date = request.args.get('execution_date')
- execution_date = pendulum.parse(execution_date)
+ execution_date = pendulum.parse(execution_date, strict=False)
ignore_all_deps = request.args.get('ignore_all_deps') == "true"
ignore_task_deps = request.args.get('ignore_task_deps') == "true"
ignore_ti_state = request.args.get('ignore_ti_state') == "true"
@@ -1141,7 +1141,7 @@ def clear(self):
dag = dagbag.get_dag(dag_id)
execution_date = request.args.get('execution_date')
- execution_date = pendulum.parse(execution_date)
+ execution_date = pendulum.parse(execution_date, strict=False)
confirmed = request.args.get('confirmed') == "true"
upstream = request.args.get('upstream') == "true"
downstream = request.args.get('downstream') == "true"
@@ -1171,7 +1171,7 @@ def dagrun_clear(self):
confirmed = request.args.get('confirmed') == "true"
dag = dagbag.get_dag(dag_id)
- execution_date = pendulum.parse(execution_date)
+ execution_date = pendulum.parse(execution_date, strict=False)
start_date = execution_date
end_date = execution_date
@@ -1206,7 +1206,7 @@ def _mark_dagrun_state_as_failed(self, dag_id,
execution_date, confirmed, origin
flash('Invalid execution date', 'error')
return redirect(origin)
- execution_date = pendulum.parse(execution_date)
+ execution_date = pendulum.parse(execution_date, strict=False)
dag = dagbag.get_dag(dag_id)
if not dag:
@@ -1234,7 +1234,7 @@ def _mark_dagrun_state_as_success(self, dag_id,
execution_date, confirmed, origi
flash('Invalid execution date', 'error')
return redirect(origin)
- execution_date = pendulum.parse(execution_date)
+ execution_date = pendulum.parse(execution_date, strict=False)
dag = dagbag.get_dag(dag_id)
if not dag:
@@ -1289,7 +1289,7 @@ def _mark_task_instance_state(self, dag_id, task_id,
origin, execution_date,
task = dag.get_task(task_id)
task.dag = dag
- execution_date = pendulum.parse(execution_date)
+ execution_date = pendulum.parse(execution_date, strict=False)
if not dag:
flash("Cannot find DAG: {}".format(dag_id))
@@ -1447,7 +1447,7 @@ def recurse_nodes(task, visited):
def set_duration(tid):
if isinstance(tid, dict) and tid.get("state") == State.RUNNING
\
and tid["start_date"] is not None:
- d = timezone.utcnow() - pendulum.parse(tid["start_date"])
+ d = timezone.utcnow() - pendulum.parse(tid["start_date"],
strict=False)
tid["duration"] = d.total_seconds()
return tid
@@ -1606,7 +1606,7 @@ def duration(self, session=None):
return redirect('/admin/')
if base_date:
- base_date = pendulum.parse(base_date)
+ base_date = pendulum.parse(base_date, strict=False)
else:
base_date = dag.latest_execution_date or timezone.utcnow()
@@ -1714,7 +1714,7 @@ def tries(self, session=None):
num_runs = int(num_runs) if num_runs else default_dag_run
if base_date:
- base_date = pendulum.parse(base_date)
+ base_date = pendulum.parse(base_date, strict=False)
else:
base_date = dag.latest_execution_date or timezone.utcnow()
@@ -1778,7 +1778,7 @@ def landing_times(self, session=None):
num_runs = int(num_runs) if num_runs else default_dag_run
if base_date:
- base_date = pendulum.parse(base_date)
+ base_date = pendulum.parse(base_date, strict=False)
else:
base_date = dag.latest_execution_date or timezone.utcnow()
@@ -2000,7 +2000,7 @@ def task_instances(self, session=None):
dttm = request.args.get('execution_date')
if dttm:
- dttm = pendulum.parse(dttm)
+ dttm = pendulum.parse(dttm, strict=False)
else:
return "Error: Invalid execution_date"
@@ -2939,7 +2939,7 @@ def get_one(self, id):
https://github.com/flask-admin/flask-admin/issues/1226
"""
task_id, dag_id, execution_date = iterdecode(id)
- execution_date = pendulum.parse(execution_date)
+ execution_date = pendulum.parse(execution_date, strict=False)
return self.session.query(self.model).get((task_id, dag_id,
execution_date))
diff --git a/airflow/www_rbac/decorators.py b/airflow/www_rbac/decorators.py
index 8f962ef840..4215ac030d 100644
--- a/airflow/www_rbac/decorators.py
+++ b/airflow/www_rbac/decorators.py
@@ -47,7 +47,7 @@ def wrapper(*args, **kwargs):
if 'execution_date' in request.args:
log.execution_date = pendulum.parse(
- request.args.get('execution_date'))
+ request.args.get('execution_date'), strict=False)
session.add(log)
session.commit()
diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py
index 49a9a734cc..643c099db5 100644
--- a/airflow/www_rbac/views.py
+++ b/airflow/www_rbac/views.py
@@ -81,7 +81,7 @@
def get_date_time_num_runs_dag_runs_form_data(request, session, dag):
dttm = request.args.get('execution_date')
if dttm:
- dttm = pendulum.parse(dttm)
+ dttm = pendulum.parse(dttm, strict=False)
else:
dttm = dag.latest_execution_date or timezone.utcnow()
@@ -460,7 +460,7 @@ def rendered(self):
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
execution_date = request.args.get('execution_date')
- dttm = pendulum.parse(execution_date)
+ dttm = pendulum.parse(execution_date, strict=False)
form = DateTimeForm(data={'execution_date': dttm})
dag = dagbag.get_dag(dag_id)
task = copy.copy(dag.get_task(task_id))
@@ -498,7 +498,7 @@ def get_logs_with_metadata(self, session=None):
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
execution_date = request.args.get('execution_date')
- dttm = pendulum.parse(execution_date)
+ dttm = pendulum.parse(execution_date, strict=False)
try_number = int(request.args.get('try_number'))
metadata = request.args.get('metadata')
metadata = json.loads(metadata)
@@ -558,7 +558,7 @@ def log(self, session=None):
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
execution_date = request.args.get('execution_date')
- dttm = pendulum.parse(execution_date)
+ dttm = pendulum.parse(execution_date, strict=False)
form = DateTimeForm(data={'execution_date': dttm})
dag = dagbag.get_dag(dag_id)
@@ -586,7 +586,7 @@ def task(self):
# Carrying execution_date through, even though it's irrelevant for
# this context
execution_date = request.args.get('execution_date')
- dttm = pendulum.parse(execution_date)
+ dttm = pendulum.parse(execution_date, strict=False)
form = DateTimeForm(data={'execution_date': dttm})
dag = dagbag.get_dag(dag_id)
@@ -663,7 +663,7 @@ def xcom(self, session=None):
# Carrying execution_date through, even though it's irrelevant for
# this context
execution_date = request.args.get('execution_date')
- dttm = pendulum.parse(execution_date)
+ dttm = pendulum.parse(execution_date, strict=False)
form = DateTimeForm(data={'execution_date': dttm})
dag = dagbag.get_dag(dag_id)
if not dag or task_id not in dag.task_ids:
@@ -703,7 +703,7 @@ def run(self):
task = dag.get_task(task_id)
execution_date = request.args.get('execution_date')
- execution_date = pendulum.parse(execution_date)
+ execution_date = pendulum.parse(execution_date, strict=False)
ignore_all_deps = request.args.get('ignore_all_deps') == "true"
ignore_task_deps = request.args.get('ignore_task_deps') == "true"
ignore_ti_state = request.args.get('ignore_ti_state') == "true"
@@ -868,7 +868,7 @@ def clear(self):
dag = dagbag.get_dag(dag_id)
execution_date = request.args.get('execution_date')
- execution_date = pendulum.parse(execution_date)
+ execution_date = pendulum.parse(execution_date, strict=False)
confirmed = request.args.get('confirmed') == "true"
upstream = request.args.get('upstream') == "true"
downstream = request.args.get('downstream') == "true"
@@ -898,7 +898,7 @@ def dagrun_clear(self):
confirmed = request.args.get('confirmed') == "true"
dag = dagbag.get_dag(dag_id)
- execution_date = pendulum.parse(execution_date)
+ execution_date = pendulum.parse(execution_date, strict=False)
start_date = execution_date
end_date = execution_date
@@ -940,7 +940,7 @@ def _mark_dagrun_state_as_failed(self, dag_id,
execution_date, confirmed, origin
flash('Invalid execution date', 'error')
return redirect(origin)
- execution_date = pendulum.parse(execution_date)
+ execution_date = pendulum.parse(execution_date, strict=False)
dag = dagbag.get_dag(dag_id)
if not dag:
@@ -968,7 +968,7 @@ def _mark_dagrun_state_as_success(self, dag_id,
execution_date, confirmed, origi
flash('Invalid execution date', 'error')
return redirect(origin)
- execution_date = pendulum.parse(execution_date)
+ execution_date = pendulum.parse(execution_date, strict=False)
dag = dagbag.get_dag(dag_id)
if not dag:
@@ -1023,7 +1023,7 @@ def _mark_task_instance_state(self, dag_id, task_id,
origin, execution_date,
task = dag.get_task(task_id)
task.dag = dag
- execution_date = pendulum.parse(execution_date)
+ execution_date = pendulum.parse(execution_date, strict=False)
if not dag:
flash("Cannot find DAG: {}".format(dag_id))
@@ -1182,7 +1182,7 @@ def recurse_nodes(task, visited):
def set_duration(tid):
if (isinstance(tid, dict) and tid.get("state") ==
State.RUNNING and
tid["start_date"] is not None):
- d = timezone.utcnow() - pendulum.parse(tid["start_date"])
+ d = timezone.utcnow() - pendulum.parse(tid["start_date"],
strict=False)
tid["duration"] = d.total_seconds()
return tid
@@ -1348,7 +1348,7 @@ def duration(self, session=None):
return redirect('/')
if base_date:
- base_date = pendulum.parse(base_date)
+ base_date = pendulum.parse(base_date, strict=False)
else:
base_date = dag.latest_execution_date or timezone.utcnow()
@@ -1456,7 +1456,7 @@ def tries(self, session=None):
num_runs = int(num_runs) if num_runs else default_dag_run
if base_date:
- base_date = pendulum.parse(base_date)
+ base_date = pendulum.parse(base_date, strict=False)
else:
base_date = dag.latest_execution_date or timezone.utcnow()
@@ -1521,7 +1521,7 @@ def landing_times(self, session=None):
num_runs = int(num_runs) if num_runs else default_dag_run
if base_date:
- base_date = pendulum.parse(base_date)
+ base_date = pendulum.parse(base_date, strict=False)
else:
base_date = dag.latest_execution_date or timezone.utcnow()
@@ -1755,7 +1755,7 @@ def task_instances(self, session=None):
dttm = request.args.get('execution_date')
if dttm:
- dttm = pendulum.parse(dttm)
+ dttm = pendulum.parse(dttm, strict=False)
else:
return "Error: Invalid execution_date"
@@ -2389,7 +2389,7 @@ def get_one(self, id):
Flask-Admin side.
https://github.com/flask-admin/flask-admin/issues/1226
"""
task_id, dag_id, execution_date = iterdecode(id) # noqa
- execution_date = pendulum.parse(execution_date)
+ execution_date = pendulum.parse(execution_date, strict=False)
return self.session.query(self.model).get((task_id, dag_id,
execution_date))
diff --git a/setup.py b/setup.py
index aa80026fc9..db534db09d 100644
--- a/setup.py
+++ b/setup.py
@@ -315,7 +315,7 @@ def do_setup():
'lxml>=4.0.0',
'markdown>=2.5.2, <3.0',
'pandas>=0.17.1, <1.0.0',
- 'pendulum==1.4.4',
+ 'pendulum==2.0.3',
'psutil>=4.2.0, <6.0.0',
'pygments>=2.0.1, <3.0',
'python-daemon>=2.1.1, <2.2',
diff --git a/tests/core.py b/tests/core.py
index b297edcbcd..185ab15322 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -24,6 +24,7 @@
import bleach
import doctest
+import pendulum
import mock
import multiprocessing
import os
@@ -68,7 +69,6 @@
from airflow.configuration import AirflowConfigException, run_command
from jinja2.sandbox import SecurityError
from jinja2 import UndefinedError
-from pendulum import utcnow
import six
@@ -311,7 +311,8 @@ def test_schedule_dag_no_end_date_up_to_today_only(self):
"""
session = settings.Session()
delta = timedelta(days=1)
- now = utcnow()
+ now = datetime.datetime.utcnow()\
+ .replace(tzinfo=pendulum.timezone('UTC'))
start_date = now.subtract(weeks=1)
runs = (now - start_date).days
diff --git a/tests/ti_deps/deps/test_prev_dagrun_dep.py
b/tests/ti_deps/deps/test_prev_dagrun_dep.py
index e5d8cdf2b1..b73df2e21c 100644
--- a/tests/ti_deps/deps/test_prev_dagrun_dep.py
+++ b/tests/ti_deps/deps/test_prev_dagrun_dep.py
@@ -21,12 +21,16 @@
from datetime import datetime
from mock import Mock
+import pendulum
from airflow.models import DAG, BaseOperator
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
from airflow.utils.state import State
+UTC = pendulum.timezone('UTC')
+
+
class PrevDagrunDepTest(unittest.TestCase):
def _get_task(self, **kwargs):
@@ -42,9 +46,9 @@ def test_not_depends_on_past(self):
wait_for_downstream=False)
prev_ti = Mock(task=task, state=State.SUCCESS,
are_dependents_done=Mock(return_value=True),
- execution_date=datetime(2016, 1, 2))
+ execution_date=datetime(2016, 1, 2, tzinfo=UTC))
ti = Mock(task=task, previous_ti=prev_ti,
- execution_date=datetime(2016, 1, 3))
+ execution_date=datetime(2016, 1, 3, tzinfo=UTC))
dep_context = DepContext(ignore_depends_on_past=False)
self.assertTrue(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context))
@@ -59,9 +63,9 @@ def test_context_ignore_depends_on_past(self):
wait_for_downstream=False)
prev_ti = Mock(task=task, state=State.SUCCESS,
are_dependents_done=Mock(return_value=True),
- execution_date=datetime(2016, 1, 2))
+ execution_date=datetime(2016, 1, 2, tzinfo=UTC))
ti = Mock(task=task, previous_ti=prev_ti,
- execution_date=datetime(2016, 1, 3))
+ execution_date=datetime(2016, 1, 3, tzinfo=UTC))
dep_context = DepContext(ignore_depends_on_past=True)
self.assertTrue(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context))
@@ -75,7 +79,7 @@ def test_first_task_run(self):
wait_for_downstream=False)
prev_ti = None
ti = Mock(task=task, previous_ti=prev_ti,
- execution_date=datetime(2016, 1, 1))
+ execution_date=datetime(2016, 1, 1, tzinfo=UTC))
dep_context = DepContext(ignore_depends_on_past=False)
self.assertTrue(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context))
@@ -90,7 +94,7 @@ def test_prev_ti_bad_state(self):
prev_ti = Mock(state=State.NONE,
are_dependents_done=Mock(return_value=True))
ti = Mock(task=task, previous_ti=prev_ti,
- execution_date=datetime(2016, 1, 2))
+ execution_date=datetime(2016, 1, 2, tzinfo=UTC))
dep_context = DepContext(ignore_depends_on_past=False)
self.assertFalse(PrevDagrunDep().is_met(ti=ti,
dep_context=dep_context))
@@ -107,7 +111,7 @@ def test_failed_wait_for_downstream(self):
prev_ti = Mock(state=State.SUCCESS,
are_dependents_done=Mock(return_value=False))
ti = Mock(task=task, previous_ti=prev_ti,
- execution_date=datetime(2016, 1, 2))
+ execution_date=datetime(2016, 1, 2, tzinfo=UTC))
dep_context = DepContext(ignore_depends_on_past=False)
self.assertFalse(PrevDagrunDep().is_met(ti=ti,
dep_context=dep_context))
@@ -122,7 +126,7 @@ def test_all_met(self):
prev_ti = Mock(state=State.SUCCESS,
are_dependents_done=Mock(return_value=True))
ti = Mock(task=task, previous_ti=prev_ti,
- execution_date=datetime(2016, 1, 2))
+ execution_date=datetime(2016, 1, 2, tzinfo=UTC))
dep_context = DepContext(ignore_depends_on_past=False)
self.assertTrue(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context))
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services