ashb commented on a change in pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#discussion_r660465772
##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -135,7 +136,7 @@
'label': 'custom_task',
},
],
- "timezone": "UTC",
+ "schedule_timezone": "UTC",
Review comment:
If we can find a to not have to do `return
pendulum.parse("2021-01-01T12:00:00" + tz_name).tzinfo` I'd be happy with that
approach, but with that it just feels too hacky I think.
##########
File path: airflow/models/dag.py
##########
@@ -107,6 +108,40 @@ def get_last_dagrun(dag_id, session,
include_externally_triggered=False):
return query.first()
+def _get_tzname_or_offset(obj):
Review comment:
Should live in airflow.utils.timezone
##########
File path: airflow/models/dag.py
##########
@@ -107,6 +108,40 @@ def get_last_dagrun(dag_id, session,
include_externally_triggered=False):
return query.first()
+def _get_tzname_or_offset(obj):
+ if _is_valid_pendulum_tzname(obj):
+ return obj
+ if hasattr(obj, 'name'):
+ candidate = obj.name
+ if _is_valid_pendulum_tzname(candidate):
+ return candidate
+ try:
+ candidate = obj.tzname(datetime.now(timezone.utc))
+ if _is_valid_pendulum_tzname(candidate):
+ return candidate
+ except (NotImplementedError, ValueError):
+ pass
+
+ try:
+ candidate =
int(obj.utcoffset(datetime.now(timezone.utc)).total_seconds())
+ if _is_valid_pendulum_tzname(candidate):
+ return candidate
+ except (NotImplementedError, ValueError):
+ pass
+
+ raise ValueError(f"Can't get a timezone name or offset from {obj}")
+
+
+def _is_valid_pendulum_tzname(name):
+ if isinstance(name, str) or isinstance(name, int):
Review comment:
```suggestion
if isinstance(name, (str, int)):
```
##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -712,7 +714,8 @@ def deserialize_dag(cls, encoded_dag: Dict[str, Any]) ->
'SerializedDAG':
v = {task["task_id"]:
SerializedBaseOperator.deserialize_operator(task) for task in v}
k = "task_dict"
elif k == "timezone":
- v = cls._deserialize_timezone(v)
+ dag._schedule_timezone = v
Review comment:
Not needed if we keep `dag.timezone` as an attribute.
##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -186,7 +186,9 @@ def serialize_to_json(
if cls._is_excluded(value, key, object_to_serialize):
continue
- if key in decorated_fields:
+ if key == 'timezone':
+ serialized_object[key] =
cls._serialize(object_to_serialize._schedule_timezone)
+ elif key in decorated_fields:
Review comment:
We shouldn't need this here, handle it in `def _serialize` instead.
##########
File path: airflow/models/dag.py
##########
@@ -107,6 +108,40 @@ def get_last_dagrun(dag_id, session,
include_externally_triggered=False):
return query.first()
+def _get_tzname_or_offset(obj):
+ if _is_valid_pendulum_tzname(obj):
+ return obj
+ if hasattr(obj, 'name'):
+ candidate = obj.name
+ if _is_valid_pendulum_tzname(candidate):
+ return candidate
+ try:
+ candidate = obj.tzname(datetime.now(timezone.utc))
+ if _is_valid_pendulum_tzname(candidate):
+ return candidate
+ except (NotImplementedError, ValueError):
+ pass
+
+ try:
+ candidate =
int(obj.utcoffset(datetime.now(timezone.utc)).total_seconds())
+ if _is_valid_pendulum_tzname(candidate):
+ return candidate
+ except (NotImplementedError, ValueError):
+ pass
+
+ raise ValueError(f"Can't get a timezone name or offset from {obj}")
+
+
+def _is_valid_pendulum_tzname(name):
+ if isinstance(name, str) or isinstance(name, int):
+ try:
+ return pendulum.timezone(name)
Review comment:
```suggestion
pendulum.timezone(name)
return True
```
##########
File path: airflow/models/dag.py
##########
@@ -2209,7 +2249,7 @@ def get_num_task_instances(dag_id, task_ids=None,
states=None, session=None):
def get_serialized_fields(cls):
"""Stringified DAGs and operators contain exactly these fields."""
if not cls.__serialized_fields:
- cls.__serialized_fields =
frozenset(vars(DAG(dag_id='test')).keys()) - {
+ cls.__serialized_fields =
frozenset(set(vars(DAG(dag_id='test')).keys())) - {
Review comment:
Unnecessary change if we keep time timezone as an actual property.
##########
File path: airflow/models/dag.py
##########
@@ -304,14 +339,15 @@ def __init__(
# set timezone from start_date
if start_date and start_date.tzinfo:
- self.timezone = start_date.tzinfo
+ self._schedule_timezone = _get_tzname_or_offset(start_date.tzinfo)
elif 'start_date' in self.default_args and
self.default_args['start_date']:
if isinstance(self.default_args['start_date'], str):
self.default_args['start_date'] =
timezone.parse(self.default_args['start_date'])
- self.timezone = self.default_args['start_date'].tzinfo
+ if self.default_args['start_date'] and
self.default_args['start_date'].tzinfo:
+ self._schedule_timezone =
_get_tzname_or_offset(self.default_args['start_date'].tzinfo)
- if not hasattr(self, 'timezone') or not self.timezone:
- self.timezone = settings.TIMEZONE
+ if not hasattr(self, '_schedule_timezone'):
+ self._schedule_timezone = _get_tzname_or_offset(settings.TIMEZONE)
Review comment:
Does any of this actualy need changing?
Can't we keep `self.timezone` as an actual timezone object?
##########
File path: airflow/utils/timezone.py
##########
@@ -184,3 +185,38 @@ def coerce_datetime(v: Union[None, dt.datetime, DateTime])
-> Optional[DateTime]
if v.tzinfo is None:
v = make_aware(v)
return pendulum.instance(v)
+
+
+def _get_tzname_or_offset(obj):
+ if _is_valid_pendulum_tzname(obj):
+ return obj
+ if hasattr(obj, 'name'):
+ candidate = obj.name
+ if _is_valid_pendulum_tzname(candidate):
+ return candidate
+ try:
+ candidate = obj.tzname(dt.datetime.now(dt.timezone.utc))
+ if _is_valid_pendulum_tzname(candidate):
+ return candidate
+ except (NotImplementedError, ValueError):
+ pass
+
+ try:
+ candidate =
int(obj.utcoffset(dt.datetime.now(dt.timezone.utc)).total_seconds())
Review comment:
> The risk is that someone uses some custom datetime.tzinfo with a
custom name with no equivalent in pendulum
I think so long as we document "don't do this" I'm okay wit this. The
chance of someone creating a custom timezone seems _very_ slim given Airflow's
typical users.
##########
File path: airflow/utils/timezone.py
##########
@@ -184,3 +185,38 @@ def coerce_datetime(v: Union[None, dt.datetime, DateTime])
-> Optional[DateTime]
if v.tzinfo is None:
v = make_aware(v)
return pendulum.instance(v)
+
+
+def _get_tzname_or_offset(obj):
+ if _is_valid_pendulum_tzname(obj):
+ return obj
+ if hasattr(obj, 'name'):
+ candidate = obj.name
+ if _is_valid_pendulum_tzname(candidate):
+ return candidate
+ try:
+ candidate = obj.tzname(dt.datetime.now(dt.timezone.utc))
+ if _is_valid_pendulum_tzname(candidate):
+ return candidate
+ except (NotImplementedError, ValueError):
+ pass
+
+ try:
+ candidate =
int(obj.utcoffset(dt.datetime.now(dt.timezone.utc)).total_seconds())
Review comment:
https://airflow.apache.org/docs/apache-airflow/stable/timezone.html?highlight=timezone#time-zone-aware-dags
would be the place to put such a comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]