[ 
https://issues.apache.org/jira/browse/AIRFLOW-3079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633063#comment-16633063
 ] 

ASF GitHub Bot commented on AIRFLOW-3079:
-----------------------------------------

ashb closed pull request #3964: [AIRFLOW-3079] Improve migration scripts to 
support MSSQL Server
URL: https://github.com/apache/incubator-airflow/pull/3964
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/airflow/migrations/versions/0a2a5b66e19d_add_task_reschedule_table.py 
b/airflow/migrations/versions/0a2a5b66e19d_add_task_reschedule_table.py
index 6eef6a9437..643a1ca81b 100644
--- a/airflow/migrations/versions/0a2a5b66e19d_add_task_reschedule_table.py
+++ b/airflow/migrations/versions/0a2a5b66e19d_add_task_reschedule_table.py
@@ -39,6 +39,12 @@
 TABLE_NAME = 'task_reschedule'
 INDEX_NAME = 'idx_' + TABLE_NAME + '_dag_task_date'
 
+# For Microsoft SQL Server, TIMESTAMP is a row-id type,
+# having nothing to do with date-time.  DateTime() will
+# be sufficient.
+def mssql_timestamp():
+    return sa.DateTime()
+
 def mysql_timestamp():
     return mysql.TIMESTAMP(fsp=6)
 
@@ -50,6 +56,8 @@ def upgrade():
     conn = op.get_bind()
     if conn.dialect.name == 'mysql':
         timestamp = mysql_timestamp
+    elif conn.dialect.name == 'mssql':
+        timestamp = mssql_timestamp
     else:
         timestamp = sa_timestamp
 
diff --git 
a/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py 
b/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py
index 567172f9bf..ac7824fd9e 100644
--- a/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py
+++ b/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py
@@ -86,8 +86,8 @@ def upgrade():
         op.alter_column(table_name='xcom', column_name='timestamp', 
type_=mysql.TIMESTAMP(fsp=6))
         op.alter_column(table_name='xcom', column_name='execution_date', 
type_=mysql.TIMESTAMP(fsp=6))
     else:
-        # sqlite datetime is fine as is not converting
-        if conn.dialect.name == 'sqlite':
+        # sqlite and mssql datetime are fine as is.  Therefore, not converting
+        if conn.dialect.name in ('sqlite', 'mssql'):
             return
 
         # we try to be database agnostic, but not every db (e.g. sqlserver)
@@ -182,7 +182,7 @@ def downgrade():
         op.alter_column(table_name='xcom', column_name='DATETIME', 
type_=mysql.DATETIME(fsp=6))
         op.alter_column(table_name='xcom', column_name='execution_date', 
type_=mysql.DATETIME(fsp=6))
     else:
-        if conn.dialect.name == 'sqlite':
+        if conn.dialect.name in ('sqlite', 'mssql'):
             return
 
         # we try to be database agnostic, but not every db (e.g. sqlserver)
diff --git 
a/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py
 
b/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py
index 925bf26df0..88a4199d32 100644
--- 
a/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py
+++ 
b/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py
@@ -38,12 +38,23 @@
 
 
 def upgrade():
+
+    columns_and_constraints = [
+        sa.Column("one_row_id", sa.Boolean, server_default=sa.true(), 
primary_key=True),
+        sa.Column("resource_version", sa.String(255))
+    ]
+
+    conn = op.get_bind()
+
+    # alembic creates an invalid SQL for mssql dialect
+    if conn.dialect.name not in ('mssql'):
+        columns_and_constraints.append(sa.CheckConstraint("one_row_id", 
name="kube_resource_version_one_row_id"))
+
     table = op.create_table(
         RESOURCE_TABLE,
-        sa.Column("one_row_id", sa.Boolean, server_default=sa.true(), 
primary_key=True),
-        sa.Column("resource_version", sa.String(255)),
-        sa.CheckConstraint("one_row_id", 
name="kube_resource_version_one_row_id")
+        *columns_and_constraints
     )
+
     op.bulk_insert(table, [
         {"resource_version": ""}
     ])
diff --git 
a/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py
 
b/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py
index ace7845965..633201b03b 100644
--- 
a/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py
+++ 
b/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py
@@ -38,12 +38,23 @@
 
 
 def upgrade():
+
+    columns_and_constraints = [
+        sa.Column("one_row_id", sa.Boolean, server_default=sa.true(), 
primary_key=True),
+        sa.Column("worker_uuid", sa.String(255))
+    ]
+
+    conn = op.get_bind()
+
+    # alembic creates an invalid SQL for mssql dialect
+    if conn.dialect.name not in ('mssql'):
+        columns_and_constraints.append(sa.CheckConstraint("one_row_id", 
name="kube_worker_one_row_id"))
+
     table = op.create_table(
         RESOURCE_TABLE,
-        sa.Column("one_row_id", sa.Boolean, server_default=sa.true(), 
primary_key=True),
-        sa.Column("worker_uuid", sa.String(255)),
-        sa.CheckConstraint("one_row_id", name="kube_worker_one_row_id")
+        *columns_and_constraints
     )
+
     op.bulk_insert(table, [
         {"worker_uuid": ""}
     ])


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> initdb fails on Microsoft SQL Server
> ------------------------------------
>
>                 Key: AIRFLOW-3079
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3079
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: database
>    Affects Versions: 1.10.0
>            Reporter: Morten Post
>            Priority: Major
>
> airflow initdb fails using Microsoft SQL Server 17 backend. Problem does not 
> exist in 1.9.0.
> [*****@******** airflow]$ airflow initdb
> [2018-09-17 14:08:28,744] \{settings.py:174} INFO - setting.configure_orm(): 
> Using pool settings. pool_size=5, pool_recycle=1800
> [2018-09-17 14:08:28,865] \{__init__.py:51} INFO - Using executor 
> SequentialExecutor
> DB: DB: mssql+pyodbc://***************/Airflow?driver=ODBC Driver 17 for SQL 
> Server
> [2018-09-17 14:08:28,967] \{db.py:338} INFO - Creating tables
> INFO [alembic.runtime.migration] Context impl MSSQLImpl.
> INFO [alembic.runtime.migration] Will assume transactional DDL.
> INFO [alembic.runtime.migration] Running upgrade -> e3a246e0dc1, current 
> schema
> INFO [alembic.runtime.migration] Running upgrade e3a246e0dc1 -> 1507a7289a2f, 
> create is_encrypted
> INFO [alembic.runtime.migration] Running upgrade 1507a7289a2f -> 
> 13eb55f81627, maintain history for compatibility with earlier migrations
> INFO [alembic.runtime.migration] Running upgrade 13eb55f81627 -> 
> 338e90f54d61, More logging into task_isntance
> INFO [alembic.runtime.migration] Running upgrade 338e90f54d61 -> 52d714495f0, 
> job_id indices
> INFO [alembic.runtime.migration] Running upgrade 52d714495f0 -> 502898887f84, 
> Adding extra to Log
> INFO [alembic.runtime.migration] Running upgrade 502898887f84 -> 
> 1b38cef5b76e, add dagrun
> INFO [alembic.runtime.migration] Running upgrade 1b38cef5b76e -> 
> 2e541a1dcfed, task_duration
> INFO [alembic.runtime.migration] Running upgrade 2e541a1dcfed -> 
> 40e67319e3a9, dagrun_config
> INFO [alembic.runtime.migration] Running upgrade 40e67319e3a9 -> 
> 561833c1c74b, add password column to user
> INFO [alembic.runtime.migration] Running upgrade 561833c1c74b -> 4446e08588, 
> dagrun start end
> INFO [alembic.runtime.migration] Running upgrade 4446e08588 -> bbc73705a13e, 
> Add notification_sent column to sla_miss
> INFO [alembic.runtime.migration] Running upgrade bbc73705a13e -> 
> bba5a7cfc896, Add a column to track the encryption state of the 'Extra' field 
> in connection
> INFO [alembic.runtime.migration] Running upgrade bba5a7cfc896 -> 
> 1968acfc09e3, add is_encrypted column to variable table
> INFO [alembic.runtime.migration] Running upgrade 1968acfc09e3 -> 
> 2e82aab8ef20, rename user table
> INFO [alembic.runtime.migration] Running upgrade 2e82aab8ef20 -> 
> 211e584da130, add TI state index
> INFO [alembic.runtime.migration] Running upgrade 211e584da130 -> 
> 64de9cddf6c9, add task fails journal table
> INFO [alembic.runtime.migration] Running upgrade 64de9cddf6c9 -> 
> f2ca10b85618, add dag_stats table
> INFO [alembic.runtime.migration] Running upgrade f2ca10b85618 -> 
> 4addfa1236f1, Add fractional seconds to mysql tables
> INFO [alembic.runtime.migration] Running upgrade 4addfa1236f1 -> 
> 8504051e801b, xcom dag task indices
> INFO [alembic.runtime.migration] Running upgrade 8504051e801b -> 
> 5e7d17757c7a, add pid field to TaskInstance
> INFO [alembic.runtime.migration] Running upgrade 5e7d17757c7a -> 
> 127d2bf2dfa7, Add dag_id/state index on dag_run table
> INFO [alembic.runtime.migration] Running upgrade 127d2bf2dfa7 -> 
> cc1e65623dc7, add max tries column to task instance
> INFO [alembic.runtime.migration] Running upgrade cc1e65623dc7 -> 
> bdaa763e6c56, Make xcom value column a large binary
> INFO [alembic.runtime.migration] Running upgrade bdaa763e6c56 -> 
> 947454bf1dff, add ti job_id index
> INFO [alembic.runtime.migration] Running upgrade 947454bf1dff -> 
> d2ae31099d61, Increase text size for MySQL (not relevant for other DBs' text 
> types)
> INFO [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 
> 0e2a74e0fc9f, Add time zone awareness
> Traceback (most recent call last):
>  File "/bin/airflow", line 32, in <module>
>  args.func(args)
>  File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 1002, in 
> initdb
>  db_utils.initdb(settings.RBAC)
>  File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 92, in 
> initdb
>  upgradedb()
>  File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 346, in 
> upgradedb
>  command.upgrade(config, 'heads')
>  File "/usr/lib/python2.7/site-packages/alembic/command.py", line 174, in 
> upgrade
>  script.run_env()
>  File "/usr/lib/python2.7/site-packages/alembic/script/base.py", line 416, in 
> run_env
>  util.load_python_file(self.dir, 'env.py')
>  File "/usr/lib/python2.7/site-packages/alembic/util/pyfiles.py", line 93, in 
> load_python_file
>  module = load_module_py(module_id, path)
>  File "/usr/lib/python2.7/site-packages/alembic/util/compat.py", line 79, in 
> load_module_py
>  mod = imp.load_source(module_id, path, fp)
>  File "/usr/lib/python2.7/site-packages/airflow/migrations/env.py", line 91, 
> in <module>
>  run_migrations_online()
>  File "<string>", line 8, in run_migrations
>  File "/usr/lib/python2.7/site-packages/alembic/runtime/environment.py", line 
> 807, in run_migrations
>  self.get_context().run_migrations(**kw)
>  File "/usr/lib/python2.7/site-packages/alembic/runtime/migration.py", line 
> 321, in run_migrations
>  step.migration_fn(**kw)
>  File 
> "/usr/lib/python2.7/site-packages/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py",
>  line 96, in upgrade
>  op.alter_column(table_name='chart', column_name='last_modified', 
> type_=sa.TIMESTAMP(timezone=True))
>  File "<string>", line 8, in alter_column
>  File "<string>", line 3, in alter_column
>  File "/usr/lib/python2.7/site-packages/alembic/operations/ops.py", line 
> 1420, in alter_column
>  return operations.invoke(alt)
>  File "/usr/lib/python2.7/site-packages/alembic/operations/base.py", line 
> 318, in invoke
>  return fn(self, operation)
>  File "/usr/lib/python2.7/site-packages/alembic/operations/toimpl.py", line 
> 53, in alter_column
>  **operation.kw
>  File "/usr/lib/python2.7/site-packages/alembic/ddl/mssql.py", line 67, in 
> alter_column
>  **kw
>  File "/usr/lib/python2.7/site-packages/alembic/ddl/impl.py", line 160, in 
> alter_column
>  existing_nullable=existing_nullable,
>  File "/usr/lib/python2.7/site-packages/alembic/ddl/mssql.py", line 23, in 
> _exec
>  result = super(MSSQLImpl, self)._exec(construct, *args, **kw)
>  File "/usr/lib/python2.7/site-packages/alembic/ddl/impl.py", line 118, in 
> _exec
>  return conn.execute(construct, *multiparams, **params)
>  File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 
> 945, in execute
>  return meth(self, multiparams, params)
>  File "/usr/lib64/python2.7/site-packages/sqlalchemy/sql/ddl.py", line 68, in 
> _execute_on_connection
>  return connection._execute_ddl(self, multiparams, params)
>  File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 
> 1002, in _execute_ddl
>  compiled
>  File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 
> 1189, in _execute_context
>  context)
>  File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 
> 1402, in _handle_dbapi_exception
>  exc_info
>  File "/usr/lib64/python2.7/site-packages/sqlalchemy/util/compat.py", line 
> 203, in raise_from_cause
>  reraise(type(exception), exception, tb=exc_tb, cause=cause)
>  File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 
> 1182, in _execute_context
>  context)
>  File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/default.py", line 
> 470, in do_execute
>  cursor.execute(statement, parameters)
> sqlalchemy.exc.ProgrammingError: (pyodbc.ProgrammingError) ('42000', 
> u"[42000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Cannot alter 
> column 'last_modified' to be data type timestamp. (4927) (SQLExecDirectW)") 
> [SQL: u'ALTER TABLE chart ALTER COLUMN last_modified TIMESTAMP']



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to