ashb commented on a change in pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#discussion_r660466253
##########
File path: airflow/models/dag.py
##########
@@ -107,6 +108,40 @@ def get_last_dagrun(dag_id, session,
include_externally_triggered=False):
return query.first()
+def _get_tzname_or_offset(obj):
Review comment:
Should live in airflow.utils.timezone
##########
File path: airflow/models/dag.py
##########
@@ -107,6 +108,40 @@ def get_last_dagrun(dag_id, session,
include_externally_triggered=False):
return query.first()
+def _get_tzname_or_offset(obj):
+ if _is_valid_pendulum_tzname(obj):
+ return obj
+ if hasattr(obj, 'name'):
+ candidate = obj.name
+ if _is_valid_pendulum_tzname(candidate):
+ return candidate
+ try:
+ candidate = obj.tzname(datetime.now(timezone.utc))
+ if _is_valid_pendulum_tzname(candidate):
+ return candidate
+ except (NotImplementedError, ValueError):
+ pass
+
+ try:
+ candidate =
int(obj.utcoffset(datetime.now(timezone.utc)).total_seconds())
+ if _is_valid_pendulum_tzname(candidate):
+ return candidate
+ except (NotImplementedError, ValueError):
+ pass
+
+ raise ValueError(f"Can't get a timezone name or offset from {obj}")
+
+
+def _is_valid_pendulum_tzname(name):
+ if isinstance(name, str) or isinstance(name, int):
Review comment:
```suggestion
if isinstance(name, (str, int)):
```
##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -712,7 +714,8 @@ def deserialize_dag(cls, encoded_dag: Dict[str, Any]) ->
'SerializedDAG':
v = {task["task_id"]:
SerializedBaseOperator.deserialize_operator(task) for task in v}
k = "task_dict"
elif k == "timezone":
- v = cls._deserialize_timezone(v)
+ dag._schedule_timezone = v
Review comment:
Not needed if we keep `dag.timezone` as an attribute.
##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -186,7 +186,9 @@ def serialize_to_json(
if cls._is_excluded(value, key, object_to_serialize):
continue
- if key in decorated_fields:
+ if key == 'timezone':
+ serialized_object[key] =
cls._serialize(object_to_serialize._schedule_timezone)
+ elif key in decorated_fields:
Review comment:
We shouldn't need this here, handle it in `def _serialize` instead.
##########
File path: airflow/models/dag.py
##########
@@ -107,6 +108,40 @@ def get_last_dagrun(dag_id, session,
include_externally_triggered=False):
return query.first()
+def _get_tzname_or_offset(obj):
+ if _is_valid_pendulum_tzname(obj):
+ return obj
+ if hasattr(obj, 'name'):
+ candidate = obj.name
+ if _is_valid_pendulum_tzname(candidate):
+ return candidate
+ try:
+ candidate = obj.tzname(datetime.now(timezone.utc))
+ if _is_valid_pendulum_tzname(candidate):
+ return candidate
+ except (NotImplementedError, ValueError):
+ pass
+
+ try:
+ candidate =
int(obj.utcoffset(datetime.now(timezone.utc)).total_seconds())
+ if _is_valid_pendulum_tzname(candidate):
+ return candidate
+ except (NotImplementedError, ValueError):
+ pass
+
+ raise ValueError(f"Can't get a timezone name or offset from {obj}")
+
+
+def _is_valid_pendulum_tzname(name):
+ if isinstance(name, str) or isinstance(name, int):
+ try:
+ return pendulum.timezone(name)
Review comment:
```suggestion
pendulum.timezone(name)
return True
```
##########
File path: airflow/models/dag.py
##########
@@ -2209,7 +2249,7 @@ def get_num_task_instances(dag_id, task_ids=None,
states=None, session=None):
def get_serialized_fields(cls):
"""Stringified DAGs and operators contain exactly these fields."""
if not cls.__serialized_fields:
- cls.__serialized_fields =
frozenset(vars(DAG(dag_id='test')).keys()) - {
+ cls.__serialized_fields =
frozenset(set(vars(DAG(dag_id='test')).keys())) - {
Review comment:
Unnecessary change if we keep time timezone as an actual property.
##########
File path: airflow/models/dag.py
##########
@@ -304,14 +339,15 @@ def __init__(
# set timezone from start_date
if start_date and start_date.tzinfo:
- self.timezone = start_date.tzinfo
+ self._schedule_timezone = _get_tzname_or_offset(start_date.tzinfo)
elif 'start_date' in self.default_args and
self.default_args['start_date']:
if isinstance(self.default_args['start_date'], str):
self.default_args['start_date'] =
timezone.parse(self.default_args['start_date'])
- self.timezone = self.default_args['start_date'].tzinfo
+ if self.default_args['start_date'] and
self.default_args['start_date'].tzinfo:
+ self._schedule_timezone =
_get_tzname_or_offset(self.default_args['start_date'].tzinfo)
- if not hasattr(self, 'timezone') or not self.timezone:
- self.timezone = settings.TIMEZONE
+ if not hasattr(self, '_schedule_timezone'):
+ self._schedule_timezone = _get_tzname_or_offset(settings.TIMEZONE)
Review comment:
Does any of this actualy need changing?
Can't we keep `self.timezone` as an actual timezone object?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]