[AIRFLOW-1802] Convert database fields to timezone aware
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b658c78f Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b658c78f Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b658c78f Branch: refs/heads/master Commit: b658c78f6705415f444bd206cc02cd51219b3f8d Parents: 59aba30 Author: Bolke de Bruin <[email protected]> Authored: Fri Nov 10 22:36:31 2017 +0100 Committer: Bolke de Bruin <[email protected]> Committed: Mon Nov 27 15:53:03 2017 +0100 ---------------------------------------------------------------------- .../0e2a74e0fc9f_add_time_zone_awareness.py | 213 +++++++++++++++++++ airflow/settings.py | 15 ++ 2 files changed, 228 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b658c78f/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py ---------------------------------------------------------------------- diff --git a/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py b/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py new file mode 100644 index 0000000..bb65c1c --- /dev/null +++ b/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py @@ -0,0 +1,213 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Add time zone awareness + +Revision ID: 0e2a74e0fc9f +Revises: d2ae31099d61 +Create Date: 2017-11-10 22:22:31.326152 + +""" + +# revision identifiers, used by Alembic. +revision = '0e2a74e0fc9f' +down_revision = 'd2ae31099d61' +branch_labels = None +depends_on = None + +from alembic import op +from sqlalchemy.dialects import mysql +import sqlalchemy as sa + + +def upgrade(): + conn = op.get_bind() + if conn.dialect.name == 'mysql': + conn.execute("SET time_zone = '+00:00'") + op.alter_column(table_name='chart', column_name='last_modified', type_=mysql.TIMESTAMP(fsp=6)) + + op.alter_column(table_name='dag', column_name='last_scheduler_run', type_=mysql.TIMESTAMP(fsp=6)) + op.alter_column(table_name='dag', column_name='last_pickled', type_=mysql.TIMESTAMP(fsp=6)) + op.alter_column(table_name='dag', column_name='last_expired', type_=mysql.TIMESTAMP(fsp=6)) + + op.alter_column(table_name='dag_pickle', column_name='created_dttm', type_=mysql.TIMESTAMP(fsp=6)) + + op.alter_column(table_name='dag_run', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6)) + op.alter_column(table_name='dag_run', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6)) + op.alter_column(table_name='dag_run', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6)) + + op.alter_column(table_name='import_error', column_name='timestamp', type_=mysql.TIMESTAMP(fsp=6)) + + op.alter_column(table_name='job', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6)) + op.alter_column(table_name='job', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6)) + op.alter_column(table_name='job', column_name='latest_heartbeat', type_=mysql.TIMESTAMP(fsp=6)) + + op.alter_column(table_name='known_event', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6)) + op.alter_column(table_name='known_event', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6)) + + op.alter_column(table_name='log', column_name='dttm', type_=mysql.TIMESTAMP(fsp=6)) + op.alter_column(table_name='log', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6)) + + op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6)) + op.alter_column(table_name='sla_miss', column_name='timestamp', type_=mysql.TIMESTAMP(fsp=6)) + + op.alter_column(table_name='task_fail', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6)) + op.alter_column(table_name='task_fail', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6)) + op.alter_column(table_name='task_fail', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6)) + + op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6)) + op.alter_column(table_name='task_instance', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6)) + op.alter_column(table_name='task_instance', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6)) + op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=mysql.TIMESTAMP(fsp=6)) + + op.alter_column(table_name='xcom', column_name='timestamp', type_=mysql.TIMESTAMP(fsp=6)) + op.alter_column(table_name='xcom', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6)) + else: + # sqlite datetime is fine as is not converting + if conn.dialect.name == 'sqlite': + return + + # we try to be database agnostic, but not every db (e.g. sqlserver) + # supports per session time zones + if conn.dialect.name == 'postgresql': + conn.execute("set timezone=UTC") + + op.alter_column(table_name='chart', column_name='last_modified', type_=sa.TIMESTAMP(timezone=True)) + + op.alter_column(table_name='dag', column_name='last_scheduler_run', type_=sa.TIMESTAMP(timezone=True)) + op.alter_column(table_name='dag', column_name='last_pickled', type_=sa.TIMESTAMP(timezone=True)) + op.alter_column(table_name='dag', column_name='last_expired', type_=sa.TIMESTAMP(timezone=True)) + + op.alter_column(table_name='dag_pickle', column_name='created_dttm', type_=sa.TIMESTAMP(timezone=True)) + + op.alter_column(table_name='dag_run', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True)) + op.alter_column(table_name='dag_run', column_name='start_date', type_=sa.TIMESTAMP(timezone=True)) + op.alter_column(table_name='dag_run', column_name='end_date', type_=sa.TIMESTAMP(timezone=True)) + + op.alter_column(table_name='import_error', column_name='timestamp', type_=sa.TIMESTAMP(timezone=True)) + + op.alter_column(table_name='job', column_name='start_date', type_=sa.TIMESTAMP(timezone=True)) + op.alter_column(table_name='job', column_name='end_date', type_=sa.TIMESTAMP(timezone=True)) + op.alter_column(table_name='job', column_name='latest_heartbeat', type_=sa.TIMESTAMP(timezone=True)) + + op.alter_column(table_name='known_event', column_name='start_date', type_=sa.TIMESTAMP(timezone=True)) + op.alter_column(table_name='known_event', column_name='end_date', type_=sa.TIMESTAMP(timezone=True)) + + op.alter_column(table_name='log', column_name='dttm', type_=sa.TIMESTAMP(timezone=True)) + op.alter_column(table_name='log', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True)) + + op.alter_column(table_name='sla_miss', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True), nullable=False) + op.alter_column(table_name='sla_miss', column_name='timestamp', type_=sa.TIMESTAMP(timezone=True)) + + op.alter_column(table_name='task_fail', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True)) + op.alter_column(table_name='task_fail', column_name='start_date', type_=sa.TIMESTAMP(timezone=True)) + op.alter_column(table_name='task_fail', column_name='end_date', type_=sa.TIMESTAMP(timezone=True)) + + op.alter_column(table_name='task_instance', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True), nullable=False) + op.alter_column(table_name='task_instance', column_name='start_date', type_=sa.TIMESTAMP(timezone=True)) + op.alter_column(table_name='task_instance', column_name='end_date', type_=sa.TIMESTAMP(timezone=True)) + op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=sa.TIMESTAMP(timezone=True)) + + op.alter_column(table_name='xcom', column_name='timestamp', type_=sa.TIMESTAMP(timezone=True)) + op.alter_column(table_name='xcom', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True)) + + +def downgrade(): + conn = op.get_bind() + if conn.dialect.name == 'mysql': + conn.execute("SET time_zone = '+00:00'") + op.alter_column(table_name='chart', column_name='last_modified', type_=mysql.DATETIME(fsp=6)) + + op.alter_column(table_name='dag', column_name='last_scheduler_run', type_=mysql.DATETIME(fsp=6)) + op.alter_column(table_name='dag', column_name='last_pickled', type_=mysql.DATETIME(fsp=6)) + op.alter_column(table_name='dag', column_name='last_expired', type_=mysql.DATETIME(fsp=6)) + + op.alter_column(table_name='dag_pickle', column_name='created_dttm', type_=mysql.DATETIME(fsp=6)) + + op.alter_column(table_name='dag_run', column_name='execution_date', type_=mysql.DATETIME(fsp=6)) + op.alter_column(table_name='dag_run', column_name='start_date', type_=mysql.DATETIME(fsp=6)) + op.alter_column(table_name='dag_run', column_name='end_date', type_=mysql.DATETIME(fsp=6)) + + op.alter_column(table_name='import_error', column_name='DATETIME', type_=mysql.DATETIME(fsp=6)) + + op.alter_column(table_name='job', column_name='start_date', type_=mysql.DATETIME(fsp=6)) + op.alter_column(table_name='job', column_name='end_date', type_=mysql.DATETIME(fsp=6)) + op.alter_column(table_name='job', column_name='latest_heartbeat', type_=mysql.DATETIME(fsp=6)) + + op.alter_column(table_name='known_event', column_name='start_date', type_=mysql.DATETIME(fsp=6)) + op.alter_column(table_name='known_event', column_name='end_date', type_=mysql.DATETIME(fsp=6)) + + op.alter_column(table_name='log', column_name='dttm', type_=mysql.DATETIME(fsp=6)) + op.alter_column(table_name='log', column_name='execution_date', type_=mysql.DATETIME(fsp=6)) + + op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.DATETIME(fsp=6), nullable=False) + op.alter_column(table_name='sla_miss', column_name='DATETIME', type_=mysql.DATETIME(fsp=6)) + + op.alter_column(table_name='task_fail', column_name='execution_date', type_=mysql.DATETIME(fsp=6)) + op.alter_column(table_name='task_fail', column_name='start_date', type_=mysql.DATETIME(fsp=6)) + op.alter_column(table_name='task_fail', column_name='end_date', type_=mysql.DATETIME(fsp=6)) + + op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.DATETIME(fsp=6), nullable=False) + op.alter_column(table_name='task_instance', column_name='start_date', type_=mysql.DATETIME(fsp=6)) + op.alter_column(table_name='task_instance', column_name='end_date', type_=mysql.DATETIME(fsp=6)) + op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=mysql.DATETIME(fsp=6)) + + op.alter_column(table_name='xcom', column_name='DATETIME', type_=mysql.DATETIME(fsp=6)) + op.alter_column(table_name='xcom', column_name='execution_date', type_=mysql.DATETIME(fsp=6)) + else: + if conn.dialect.name == 'sqlite': + return + + # we try to be database agnostic, but not every db (e.g. sqlserver) + # supports per session time zones + if conn.dialect.name == 'postgresql': + conn.execute("set timezone=UTC") + + op.alter_column(table_name='chart', column_name='last_modified', type_=sa.DateTime()) + + op.alter_column(table_name='dag', column_name='last_scheduler_run', type_=sa.DateTime()) + op.alter_column(table_name='dag', column_name='last_pickled', type_=sa.DateTime()) + op.alter_column(table_name='dag', column_name='last_expired', type_=sa.DateTime()) + + op.alter_column(table_name='dag_pickle', column_name='created_dttm', type_=sa.DateTime()) + + op.alter_column(table_name='dag_run', column_name='execution_date', type_=sa.DateTime()) + op.alter_column(table_name='dag_run', column_name='start_date', type_=sa.DateTime()) + op.alter_column(table_name='dag_run', column_name='end_date', type_=sa.DateTime()) + + op.alter_column(table_name='import_error', column_name='timestamp', type_=sa.DateTime()) + + op.alter_column(table_name='job', column_name='start_date', type_=sa.DateTime()) + op.alter_column(table_name='job', column_name='end_date', type_=sa.DateTime()) + op.alter_column(table_name='job', column_name='latest_heartbeat', type_=sa.DateTime()) + + op.alter_column(table_name='known_event', column_name='start_date', type_=sa.DateTime()) + op.alter_column(table_name='known_event', column_name='end_date', type_=sa.DateTime()) + + op.alter_column(table_name='log', column_name='dttm', type_=sa.DateTime()) + op.alter_column(table_name='log', column_name='execution_date', type_=sa.DateTime()) + + op.alter_column(table_name='sla_miss', column_name='execution_date', type_=sa.DateTime(), nullable=False) + op.alter_column(table_name='sla_miss', column_name='timestamp', type_=sa.DateTime()) + + op.alter_column(table_name='task_fail', column_name='execution_date', type_=sa.DateTime()) + op.alter_column(table_name='task_fail', column_name='start_date', type_=sa.DateTime()) + op.alter_column(table_name='task_fail', column_name='end_date', type_=sa.DateTime()) + + op.alter_column(table_name='task_instance', column_name='execution_date', type_=sa.DateTime(), nullable=False) + op.alter_column(table_name='task_instance', column_name='start_date', type_=sa.DateTime()) + op.alter_column(table_name='task_instance', column_name='end_date', type_=sa.DateTime()) + op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=sa.DateTime()) + + op.alter_column(table_name='xcom', column_name='timestamp', type_=sa.DateTime()) + op.alter_column(table_name='xcom', column_name='execution_date', type_=sa.DateTime()) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b658c78f/airflow/settings.py ---------------------------------------------------------------------- diff --git a/airflow/settings.py b/airflow/settings.py index 0dfbb15..ceb9b50 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -138,6 +138,20 @@ def configure_orm(disable_connection_pool=False): sessionmaker(autocommit=False, autoflush=False, bind=engine)) +def configure_adapters(): + from pendulum import Pendulum + try: + from sqlite3 import register_adapter + register_adapter(Pendulum, lambda val: val.isoformat(' ')) + except ImportError: + pass + try: + import MySQLdb.converters + MySQLdb.converters.conversions[Pendulum] = MySQLdb.converters.DateTime2literal + except ImportError: + pass + + try: from airflow_local_settings import * @@ -147,6 +161,7 @@ except: configure_logging() configure_vars() +configure_adapters() configure_orm() # Const stuff
