This is an automated email from the ASF dual-hosted git repository. jhtimmins pushed a commit to branch v2-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit ae4ab586b8bb12f0827505d05c88ac924734410a Author: Tzu-ping Chung <[email protected]> AuthorDate: Wed Jul 7 19:04:15 2021 +0800 Don't check execution_date in refresh_from_db (#16809) The native sqlalchemy DateTime type does not compare well when timezones don't match. This can happen if the current execution_date on a DagRun instance is not in UTC (the db entry is always in UTC). Since DagRun has a unique constraint on (dag_id, run_id), these two should be able to return one unique result, and the executrion_date column should not be needed anyway. Let's just remove that filter to prevent all the datetime comparison trouble. (cherry picked from commit faffaec73385db3c6910d31ccea9fc4f9f3f9d42) --- airflow/models/dagrun.py | 29 ++--------------------------- 1 file changed, 2 insertions(+), 27 deletions(-) diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index f29d6ac..07f309d 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -18,19 +18,7 @@ from datetime import datetime from typing import TYPE_CHECKING, Any, Iterable, List, NamedTuple, Optional, Tuple, Union -from sqlalchemy import ( - Boolean, - Column, - DateTime, - Index, - Integer, - PickleType, - String, - UniqueConstraint, - and_, - func, - or_, -) +from sqlalchemy import Boolean, Column, Index, Integer, PickleType, String, UniqueConstraint, and_, func, or_ from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.declarative import declared_attr from sqlalchemy.orm import backref, relationship, synonym @@ -165,20 +153,7 @@ class DagRun(Base, LoggingMixin): :param session: database session :type session: Session """ - DR = DagRun - - exec_date = func.cast(self.execution_date, DateTime) - - dr = ( - session.query(DR) - .filter( - DR.dag_id == self.dag_id, - func.cast(DR.execution_date, DateTime) == exec_date, - DR.run_id == self.run_id, - ) - .one() - ) - + dr = session.query(DagRun).filter(DagRun.dag_id == self.dag_id, DagRun.run_id == self.run_id).one() self.id = dr.id self.state = dr.state
