[AIRFLOW-1807] Force use of time zone aware db fields This change will check if all date times being stored are indeed timezone aware.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2f168634 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2f168634 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2f168634 Branch: refs/heads/master Commit: 2f168634aac4aa138f00634bbb9f4e3993346ffa Parents: c857436 Author: Bolke de Bruin <[email protected]> Authored: Sat Nov 11 13:32:02 2017 +0100 Committer: Bolke de Bruin <[email protected]> Committed: Mon Nov 27 15:54:27 2017 +0100 ---------------------------------------------------------------------- airflow/jobs.py | 9 +++++---- airflow/models.py | 47 ++++++++++++++++++++++++----------------------- airflow/www/utils.py | 8 ++++++++ airflow/www/views.py | 7 +++++++ setup.py | 1 + 5 files changed, 45 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2f168634/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 4e1864e..868e785 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -33,9 +33,10 @@ import datetime from collections import defaultdict from past.builtins import basestring from sqlalchemy import ( - Column, Integer, String, DateTime, func, Index, or_, and_, not_) + Column, Integer, String, func, Index, or_, and_, not_) from sqlalchemy.exc import OperationalError from sqlalchemy.orm.session import make_transient +from sqlalchemy_utc import UtcDateTime from tabulate import tabulate from time import sleep @@ -77,9 +78,9 @@ class BaseJob(Base, LoggingMixin): dag_id = Column(String(ID_LEN),) state = Column(String(20)) job_type = Column(String(30)) - start_date = Column(DateTime()) - end_date = Column(DateTime()) - latest_heartbeat = Column(DateTime()) + start_date = Column(UtcDateTime()) + end_date = Column(UtcDateTime()) + latest_heartbeat = Column(UtcDateTime()) executor_class = Column(String(500)) hostname = Column(String(500)) unixname = Column(String(1000)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2f168634/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index fe62ac5..f8a5f0f 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -55,6 +55,7 @@ from sqlalchemy import func, or_, and_ from sqlalchemy.ext.declarative import declarative_base, declared_attr from sqlalchemy.dialects.mysql import LONGTEXT from sqlalchemy.orm import reconstructor, relationship, synonym +from sqlalchemy_utc import UtcDateTime from croniter import croniter import six @@ -732,7 +733,7 @@ class DagPickle(Base): """ id = Column(Integer, primary_key=True) pickle = Column(PickleType(pickler=dill)) - created_dttm = Column(DateTime, default=func.now()) + created_dttm = Column(UtcDateTime, default=func.now()) pickle_hash = Column(Text) __tablename__ = "dag_pickle" @@ -763,9 +764,9 @@ class TaskInstance(Base, LoggingMixin): task_id = Column(String(ID_LEN), primary_key=True) dag_id = Column(String(ID_LEN), primary_key=True) - execution_date = Column(DateTime, primary_key=True) - start_date = Column(DateTime) - end_date = Column(DateTime) + execution_date = Column(UtcDateTime, primary_key=True) + start_date = Column(UtcDateTime) + end_date = Column(UtcDateTime) duration = Column(Float) state = Column(String(20)) try_number = Column(Integer, default=0) @@ -777,7 +778,7 @@ class TaskInstance(Base, LoggingMixin): queue = Column(String(50)) priority_weight = Column(Integer) operator = Column(String(1000)) - queued_dttm = Column(DateTime) + queued_dttm = Column(UtcDateTime) pid = Column(Integer) __table_args__ = ( @@ -1862,9 +1863,9 @@ class TaskFail(Base): task_id = Column(String(ID_LEN), primary_key=True) dag_id = Column(String(ID_LEN), primary_key=True) - execution_date = Column(DateTime, primary_key=True) - start_date = Column(DateTime) - end_date = Column(DateTime) + execution_date = Column(UtcDateTime, primary_key=True) + start_date = Column(UtcDateTime) + end_date = Column(UtcDateTime) duration = Column(Float) def __init__(self, task, execution_date, start_date, end_date): @@ -1884,11 +1885,11 @@ class Log(Base): __tablename__ = "log" id = Column(Integer, primary_key=True) - dttm = Column(DateTime) + dttm = Column(UtcDateTime) dag_id = Column(String(ID_LEN)) task_id = Column(String(ID_LEN)) event = Column(String(30)) - execution_date = Column(DateTime) + execution_date = Column(UtcDateTime) owner = Column(String(500)) extra = Column(Text) @@ -2741,12 +2742,12 @@ class DagModel(Base): # Whether that DAG was seen on the last DagBag load is_active = Column(Boolean, default=False) # Last time the scheduler started - last_scheduler_run = Column(DateTime) + last_scheduler_run = Column(UtcDateTime) # Last time this DAG was pickled - last_pickled = Column(DateTime) + last_pickled = Column(UtcDateTime) # Time when the DAG last received a refresh signal # (e.g. the DAG's "refresh" button was clicked in the web UI) - last_expired = Column(DateTime) + last_expired = Column(UtcDateTime) # Whether (one of) the scheduler is scheduling this DAG at the moment scheduler_lock = Column(Boolean) # Foreign key to the latest pickle_id @@ -3904,7 +3905,7 @@ class Chart(Base): "User", cascade=False, cascade_backrefs=False, backref='charts') x_is_date = Column(Boolean, default=True) iteration_no = Column(Integer, default=0) - last_modified = Column(DateTime, default=func.now()) + last_modified = Column(UtcDateTime, default=func.now()) def __repr__(self): return self.label @@ -3925,8 +3926,8 @@ class KnownEvent(Base): id = Column(Integer, primary_key=True) label = Column(String(200)) - start_date = Column(DateTime) - end_date = Column(DateTime) + start_date = Column(UtcDateTime) + end_date = Column(UtcDateTime) user_id = Column(Integer(), ForeignKey('users.id'),) known_event_type_id = Column(Integer(), ForeignKey('known_event_type.id'),) reported_by = relationship( @@ -4054,7 +4055,7 @@ class XCom(Base, LoggingMixin): value = Column(LargeBinary) timestamp = Column( DateTime, default=func.now(), nullable=False) - execution_date = Column(DateTime, nullable=False) + execution_date = Column(UtcDateTime, nullable=False) # source information task_id = Column(String(ID_LEN), nullable=False) @@ -4372,9 +4373,9 @@ class DagRun(Base, LoggingMixin): id = Column(Integer, primary_key=True) dag_id = Column(String(ID_LEN)) - execution_date = Column(DateTime, default=func.now()) - start_date = Column(DateTime, default=func.now()) - end_date = Column(DateTime) + execution_date = Column(UtcDateTime, default=func.now()) + start_date = Column(UtcDateTime, default=func.now()) + end_date = Column(UtcDateTime) _state = Column('state', String(50), default=State.RUNNING) run_id = Column(String(ID_LEN)) external_trigger = Column(Boolean, default=True) @@ -4790,9 +4791,9 @@ class SlaMiss(Base): task_id = Column(String(ID_LEN), primary_key=True) dag_id = Column(String(ID_LEN), primary_key=True) - execution_date = Column(DateTime, primary_key=True) + execution_date = Column(UtcDateTime, primary_key=True) email_sent = Column(Boolean, default=False) - timestamp = Column(DateTime) + timestamp = Column(UtcDateTime) description = Column(Text) notification_sent = Column(Boolean, default=False) @@ -4804,6 +4805,6 @@ class SlaMiss(Base): class ImportError(Base): __tablename__ = "import_error" id = Column(Integer, primary_key=True) - timestamp = Column(DateTime) + timestamp = Column(UtcDateTime) filename = Column(String(1024)) stacktrace = Column(Text) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2f168634/airflow/www/utils.py ---------------------------------------------------------------------- diff --git a/airflow/www/utils.py b/airflow/www/utils.py index ae1fb5f..aba85fa 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -26,6 +26,8 @@ import json import time from flask import after_this_request, request, Response +from flask_admin.contrib.sqla.filters import FilterConverter +from flask_admin.model import filters from flask_login import current_user import wtforms from wtforms.compat import text_type @@ -386,3 +388,9 @@ class AceEditorWidget(wtforms.widgets.TextArea): form_name=field.id, ) return wtforms.widgets.core.HTMLString(html) + + +class UtcFilterConverter(FilterConverter): + @filters.convert('utcdatetime') + def conv_utcdatetime(self, column, name, **kwargs): + return self.conv_datetime(column, name, **kwargs) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2f168634/airflow/www/views.py ---------------------------------------------------------------------- diff --git a/airflow/www/views.py b/airflow/www/views.py index a6378bf..550a7f8 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -2055,6 +2055,7 @@ class SlaMissModelView(wwwutils.SuperUserMixin, ModelViewOnly): column_searchable_list = ('dag_id', 'task_id',) column_filters = ( 'dag_id', 'task_id', 'email_sent', 'timestamp', 'execution_date') + filter_converter = wwwutils.UtcFilterConverter() form_widget_args = { 'email_sent': {'disabled': True}, 'timestamp': {'disabled': True}, @@ -2349,6 +2350,7 @@ class XComView(wwwutils.SuperUserMixin, AirflowModelView): column_filters = ('key', 'timestamp', 'execution_date', 'task_id', 'dag_id') column_searchable_list = ('key', 'timestamp', 'execution_date', 'task_id', 'dag_id') + filter_converter = wwwutils.UtcFilterConverter() class JobModelView(ModelViewOnly): @@ -2365,6 +2367,7 @@ class JobModelView(ModelViewOnly): hostname=nobr_f, state=state_f, latest_heartbeat=datetime_f) + filter_converter = wwwutils.UtcFilterConverter() class DagRunModelView(ModelViewOnly): @@ -2387,6 +2390,7 @@ class DagRunModelView(ModelViewOnly): column_list = ( 'state', 'dag_id', 'execution_date', 'run_id', 'external_trigger') column_filters = column_list + filter_converter = wwwutils.UtcFilterConverter() column_searchable_list = ('dag_id', 'state', 'run_id') column_formatters = dict( execution_date=datetime_f, @@ -2453,6 +2457,7 @@ class LogModelView(ModelViewOnly): column_display_actions = False column_default_sort = ('dttm', True) column_filters = ('dag_id', 'task_id', 'execution_date') + filter_converter = wwwutils.UtcFilterConverter() column_formatters = dict( dttm=datetime_f, execution_date=datetime_f, dag_id=dag_link) @@ -2463,6 +2468,7 @@ class TaskInstanceModelView(ModelViewOnly): column_filters = ( 'state', 'dag_id', 'task_id', 'execution_date', 'hostname', 'queue', 'pool', 'operator', 'start_date', 'end_date') + filter_converter = wwwutils.UtcFilterConverter() named_filter_urls = True column_formatters = dict( log_url=log_url_formatter, @@ -2752,6 +2758,7 @@ class DagModelView(wwwutils.SuperUserMixin, ModelView): column_filters = ( 'dag_id', 'owners', 'is_paused', 'is_active', 'is_subdag', 'last_scheduler_run', 'last_expired') + filter_converter = wwwutils.UtcFilterConverter() form_widget_args = { 'last_scheduler_run': {'disabled': True}, 'fileloc': {'disabled': True}, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2f168634/setup.py ---------------------------------------------------------------------- diff --git a/setup.py b/setup.py index de2bd54..cfe0d92 100644 --- a/setup.py +++ b/setup.py @@ -236,6 +236,7 @@ def do_setup(): 'requests>=2.5.1, <3', 'setproctitle>=1.1.8, <2', 'sqlalchemy>=0.9.8', + 'sqlalchemy-utc>=0.9.0', 'tabulate>=0.7.5, <0.8.0', 'thrift>=0.9.2', 'tzlocal>=1.4',
