ecerulm commented on a change in pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#discussion_r659855128
##########
File path: airflow/models/dag.py
##########
@@ -302,16 +304,20 @@ def __init__(
self.fileloc = back.f_code.co_filename if back else ""
self.task_dict: Dict[str, BaseOperator] = {}
- # set timezone from start_date
+ # set timezone from schedule_timezone or start_date
+ if schedule_timezone:
+ self.schedule_timezone =
self._get_tzname_or_offset(schedule_timezone)
Review comment:
I don't think we really want to check that the `start_date` time zone
matches the `schedule_timezone`. That's why I wanted to introduce
`schedule_timezone` to avoid having to rely on inferring things from
`start_date.tzinfo`.
There is no problem to specify the `start_date` in one timezone, the
`end_date` in another timezone, and have yet another timezone for the
`schedule_timezone`. I can see people (myself included) using iso 8601 string
like `2021-06-01T12:00:00+01:00` for `start_date` and
`schedule_timezone='Europe/Stockholm'`
##########
File path: airflow/models/dag.py
##########
@@ -302,16 +304,20 @@ def __init__(
self.fileloc = back.f_code.co_filename if back else ""
self.task_dict: Dict[str, BaseOperator] = {}
- # set timezone from start_date
+ # set timezone from schedule_timezone or start_date
+ if schedule_timezone:
+ self.schedule_timezone =
self._get_tzname_or_offset(schedule_timezone)
Review comment:
But anyway, I'm removing `schedule_timezone` from the PR, right? Because
that changes the public api of the DAG and that requires an AIP, right?
##########
File path: airflow/models/dag.py
##########
@@ -422,6 +428,38 @@ def __exit__(self, _type, _value, _tb):
# /Context Manager ----------------------------------------------
+ def _get_tzname_or_offset(self, obj):
Review comment:
Ok I moved to the module level
##########
File path: airflow/models/dag.py
##########
@@ -422,6 +428,38 @@ def __exit__(self, _type, _value, _tb):
# /Context Manager ----------------------------------------------
+ def _get_tzname_or_offset(self, obj):
+ if self._is_valid_pendulum_tz_id(obj):
+ return obj
+ if hasattr(obj, 'name'):
+ candidate = obj.name
+ if self._is_valid_pendulum_tz_id(candidate):
+ return candidate
+ try:
+ candidate = obj.tzname(datetime.now(timezone.utc))
+ if self._is_valid_pendulum_tz_id(candidate):
+ return candidate
+ except (NotImplementedError, ValueError):
+ pass
+
+ try:
+ candidate =
int(obj.utcoffset(datetime.now(timezone.utc)).total_seconds())
+ if self._is_valid_pendulum_tz_id(candidate):
+ return candidate
+ except (NotImplementedError, ValueError):
+ pass
+
+ raise ValueError(f"Can't get a timezone name or offset from {obj}")
+
+ def _is_valid_pendulum_tz_id(self, id):
Review comment:
ok renamed it to `is_valid_pendulum_tzname` and `name`
##########
File path: airflow/models/dag.py
##########
@@ -302,16 +304,20 @@ def __init__(
self.fileloc = back.f_code.co_filename if back else ""
self.task_dict: Dict[str, BaseOperator] = {}
- # set timezone from start_date
+ # set timezone from schedule_timezone or start_date
+ if schedule_timezone:
+ self.schedule_timezone =
self._get_tzname_or_offset(schedule_timezone)
Review comment:
Ok, removed `schedule_timezone` from DAG constructor constructor
##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -135,7 +136,7 @@
'label': 'custom_task',
},
],
- "timezone": "UTC",
+ "schedule_timezone": "UTC",
Review comment:
Ok, I managed to keep as `timezone` in the serialized object, but this
getting more and more complex , and more hacky that the alternative #16631.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]