This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new f2b7d05 Improve compatibility with mssql (#9973)
f2b7d05 is described below
commit f2b7d05db5042126e40767cc5a707896fb7db1de
Author: Aneesh Joseph <[email protected]>
AuthorDate: Wed May 26 14:56:51 2021 +0530
Improve compatibility with mssql (#9973)
This PR adds full support for MsSQL! YAY!
---
.github/workflows/ci.yml | 63 +++++
BREEZE.rst | 4 +-
Dockerfile.ci | 11 +
README.md | 15 +-
airflow/api_connexion/parameters.py | 13 +-
airflow/jobs/scheduler_job.py | 45 ++--
.../83f031fd9f1c_improve_mssql_compatibility.py | 257 +++++++++++++++++++++
...d48763f6d53_add_unique_constraint_to_conn_id.py | 2 +-
...606e2_add_scheduling_decision_to_dagrun_and_.py | 28 ++-
.../bbf4a7ad0465_remove_id_column_from_xcom.py | 67 +++++-
...4a3141f0_make_xcom_pkey_columns_non_nullable.py | 76 ++++++
airflow/models/dag.py | 15 +-
airflow/models/dagcode.py | 5 +-
airflow/models/dagrun.py | 5 +-
airflow/models/serialized_dag.py | 21 +-
airflow/sensors/smart_sensor.py | 29 ++-
airflow/www/security.py | 8 +
airflow/www/views.py | 16 +-
breeze | 7 +
breeze-complete | 9 +-
docs/apache-airflow/installation.rst | 1 +
.../{base.yml => backend-mssql-port.yml} | 22 +-
.../docker-compose/{base.yml => backend-mssql.yml} | 29 +--
scripts/ci/docker-compose/base.yml | 1 +
scripts/ci/libraries/_initialization.sh | 10 +
scripts/ci/selective_ci_checks.sh | 14 +-
scripts/ci/testing/ci_run_airflow_testing.sh | 6 +
scripts/in_container/check_environment.sh | 2 +
tests/bats/breeze/test_breeze_complete.bats | 16 ++
tests/models/test_renderedtifields.py | 9 +-
tests/models/test_taskinstance.py | 14 +-
tests/utils/test_db.py | 6 +
32 files changed, 713 insertions(+), 113 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index b026840..3203fa3 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -145,6 +145,7 @@ jobs:
postgresVersions: ${{ steps.selective-checks.outputs.postgres-versions }}
defaultPostgresVersion: ${{
steps.selective-checks.outputs.default-postgres-version }}
mysqlVersions: ${{ steps.selective-checks.outputs.mysql-versions }}
+ mssqlVersions: ${{ steps.selective-checks.outputs.mssql-versions }}
defaultMySQLVersion: ${{
steps.selective-checks.outputs.default-mysql-version }}
helmVersions: ${{ steps.selective-checks.outputs.helm-versions }}
defaultHelmVersion: ${{
steps.selective-checks.outputs.default-helm-version }}
@@ -153,6 +154,7 @@ jobs:
testTypes: ${{ steps.selective-checks.outputs.test-types }}
postgresExclude: ${{ steps.selective-checks.outputs.postgres-exclude }}
mysqlExclude: ${{ steps.selective-checks.outputs.mysql-exclude }}
+ mssqlExclude: ${{ steps.selective-checks.outputs.mssql-exclude }}
sqliteExclude: ${{ steps.selective-checks.outputs.sqlite-exclude }}
run-tests: ${{ steps.selective-checks.outputs.run-tests }}
run-ui-tests: ${{ steps.selective-checks.outputs.run-ui-tests }}
@@ -762,6 +764,62 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
path: "./files/coverage*.xml"
retention-days: 7
+ tests-mssql:
+ timeout-minutes: 130
+ name: >
+ MSSQL${{matrix.mssql-version}}, Py${{matrix.python-version}}:
${{needs.build-info.outputs.testTypes}}
+ runs-on: ${{ fromJson(needs.build-info.outputs.runsOn) }}
+ needs: [build-info, ci-images]
+ strategy:
+ matrix:
+ python-version: ${{ fromJson(needs.build-info.outputs.pythonVersions)
}}
+ mssql-version: ${{ fromJson(needs.build-info.outputs.mssqlVersions) }}
+ exclude: ${{ fromJson(needs.build-info.outputs.mssqlExclude) }}
+ fail-fast: false
+ env:
+ RUNS_ON: ${{ fromJson(needs.build-info.outputs.runsOn) }}
+ BACKEND: mssql
+ PYTHON_MAJOR_MINOR_VERSION: ${{ matrix.python-version }}
+ MSSQL_VERSION: ${{ matrix.mssql-version }}
+ TEST_TYPES: "${{needs.build-info.outputs.testTypes}}"
+ GITHUB_REGISTRY: ${{ needs.ci-images.outputs.githubRegistry }}
+ if: needs.build-info.outputs.run-tests == 'true'
+ steps:
+ - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
+ uses: actions/checkout@v2
+ with:
+ persist-credentials: false
+ - name: "Setup python"
+ uses: actions/setup-python@v2
+ with:
+ python-version: ${{ env.PYTHON_MAJOR_MINOR_VERSION }}
+ - name: "Free space"
+ run: ./scripts/ci/tools/ci_free_space_on_ci.sh
+ - name: "Prepare CI image ${{env.PYTHON_MAJOR_MINOR_VERSION}}:${{
env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}"
+ run: ./scripts/ci/images/ci_prepare_ci_image_on_ci.sh
+ - name: "Tests: ${{needs.build-info.outputs.testTypes}}"
+ run: ./scripts/ci/testing/ci_run_airflow_testing.sh
+ - name: "Upload airflow logs"
+ uses: actions/upload-artifact@v2
+ if: failure()
+ with:
+ name:
airflow-logs-${{matrix.python-version}}-${{matrix.mssql-version}}
+ path: "./files/airflow_logs*"
+ retention-days: 7
+ - name: "Upload container logs"
+ uses: actions/upload-artifact@v2
+ if: failure()
+ with:
+ name:
container-logs-mssql-${{matrix.python-version}}-${{matrix.mssql-version}}
+ path: "./files/container_logs*"
+ retention-days: 7
+ - name: "Upload artifact for coverage"
+ uses: actions/upload-artifact@v2
+ with:
+ name:
coverage-mssql-${{matrix.python-version}}-${{matrix.mssql-version}}
+ path: "./files/coverage*.xml"
+ retention-days: 7
+
tests-sqlite:
timeout-minutes: 130
name: >
@@ -898,6 +956,7 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
- tests-postgres
- tests-sqlite
- tests-mysql
+ - tests-mssql
- tests-quarantined
env:
RUNS_ON: ${{ fromJson(needs.build-info.outputs.runsOn) }}
@@ -1046,6 +1105,7 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
- tests-sqlite
- tests-postgres
- tests-mysql
+ - tests-mssql
- tests-kubernetes
- prod-images
- docs
@@ -1107,6 +1167,7 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
- tests-sqlite
- tests-postgres
- tests-mysql
+ - tests-mssql
- tests-kubernetes
- ci-images
- docs
@@ -1151,6 +1212,7 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
- static-checks-pylint
- tests-sqlite
- tests-mysql
+ - tests-mssql
- tests-postgres
- tests-kubernetes
env:
@@ -1221,6 +1283,7 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
- tests-sqlite
- tests-postgres
- tests-mysql
+ - tests-mssql
- tests-kubernetes
- constraints
- prepare-test-provider-packages-wheel
diff --git a/BREEZE.rst b/BREEZE.rst
index ed2b4fd..2f0510a 100644
--- a/BREEZE.rst
+++ b/BREEZE.rst
@@ -1883,7 +1883,7 @@ This is the current syntax for `./breeze <./breeze>`_:
Backend to use for tests - it determines which database is used.
One of:
- sqlite mysql postgres
+ sqlite mysql postgres mssql
Default: sqlite
@@ -2349,7 +2349,7 @@ This is the current syntax for `./breeze <./breeze>`_:
Backend to use for tests - it determines which database is used.
One of:
- sqlite mysql postgres
+ sqlite mysql postgres mssql
Default: sqlite
diff --git a/Dockerfile.ci b/Dockerfile.ci
index 92b6a45..14e4294 100644
--- a/Dockerfile.ci
+++ b/Dockerfile.ci
@@ -161,6 +161,17 @@ RUN mkdir -pv /usr/share/man/man1 \
&& apt-get autoremove -yqq --purge \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/* \
+ && curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - \
+ && curl https://packages.microsoft.com/config/debian/9/prod.list >
/etc/apt/sources.list.d/mssql-release.list \
+ && apt-get update -yqq \
+ && apt-get upgrade -yqq \
+ && ACCEPT_EULA=Y apt-get -yqq install -y --no-install-recommends \
+ gcc \
+ unixodbc-dev \
+ g++ \
+ msodbcsql17 \
+ mssql-tools \
+ && rm -rf /var/lib/apt/lists/* \
&& curl
https://download.docker.com/linux/static/stable/x86_64/docker-${DOCKER_CLI_VERSION}.tgz
\
| tar -C /usr/bin --strip-components=1 -xvzf - docker/docker
diff --git a/README.md b/README.md
index c9a8c86..d0d2e17 100644
--- a/README.md
+++ b/README.md
@@ -97,13 +97,14 @@ We **highly** recommend upgrading to the latest Airflow
major release at the ear
Apache Airflow is tested with:
-| | Master version (dev) | Stable version (2.0.2) |
Previous version (1.10.15) |
-| ------------ | ------------------------- | ------------------------ |
------------------------- |
-| Python | 3.6, 3.7, 3.8 | 3.6, 3.7, 3.8 | 2.7,
3.5, 3.6, 3.7, 3.8 |
-| Kubernetes | 1.20, 1.19, 1.18 | 1.20, 1.19, 1.18 | 1.18,
1.17, 1.16 |
-| PostgreSQL | 9.6, 10, 11, 12, 13 | 9.6, 10, 11, 12, 13 | 9.6,
10, 11, 12, 13 |
-| MySQL | 5.7, 8 | 5.7, 8 | 5.6,
5.7 |
-| SQLite | 3.15.0+ | 3.15.0+ |
3.15.0+ |
+| | Master version (dev) | Stable version (2.0.2)
| Previous version (1.10.15) |
+| -------------------- | ------------------------- | ------------------------
| ------------------------- |
+| Python | 3.6, 3.7, 3.8 | 3.6, 3.7, 3.8
| 2.7, 3.5, 3.6, 3.7, 3.8 |
+| Kubernetes | 1.20, 1.19, 1.18 | 1.20, 1.19, 1.18
| 1.18, 1.17, 1.16 |
+| PostgreSQL | 9.6, 10, 11, 12, 13 | 9.6, 10, 11, 12, 13
| 9.6, 10, 11, 12, 13 |
+| MySQL | 5.7, 8 | 5.7, 8
| 5.6, 5.7 |
+| SQLite | 3.15.0+ | 3.15.0+
| 3.15.0+ |
+| MSSQL(Experimental) | 2017,2019 |
| |
**Note:** MySQL 5.x versions are unable to or have limitations with
running multiple schedulers -- please see the [Scheduler
docs](https://airflow.apache.org/docs/apache-airflow/stable/scheduler.html).
diff --git a/airflow/api_connexion/parameters.py
b/airflow/api_connexion/parameters.py
index 8e06301..1dac50b 100644
--- a/airflow/api_connexion/parameters.py
+++ b/airflow/api_connexion/parameters.py
@@ -18,7 +18,7 @@ from functools import wraps
from typing import Callable, Dict, TypeVar, cast
from pendulum.parsing import ParserError
-from sqlalchemy import asc, desc
+from sqlalchemy import text
from airflow.api_connexion.exceptions import BadRequest
from airflow.configuration import conf
@@ -97,11 +97,10 @@ def apply_sorting(query, order_by, to_replace=None,
allowed_attrs=None):
detail=f"Ordering with '{lstriped_orderby}' is disallowed or "
f"the attribute does not exist on the model"
)
+ if to_replace:
+ lstriped_orderby = to_replace.get(lstriped_orderby, lstriped_orderby)
if order_by[0] == "-":
- func = desc
- order_by = lstriped_orderby
+ order_by = f"{lstriped_orderby} desc"
else:
- func = asc
- if to_replace:
- order_by = to_replace.get(order_by, order_by)
- return query.order_by(func(order_by))
+ order_by = f"{lstriped_orderby} asc"
+ return query.order_by(text(order_by))
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index b400ae9..bf4ad42 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -37,6 +37,7 @@ from sqlalchemy import and_, func, not_, or_, tuple_
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import load_only, selectinload
from sqlalchemy.orm.session import Session, make_transient
+from sqlalchemy.sql import expression
from airflow import models, settings
from airflow.configuration import conf
@@ -1067,15 +1068,15 @@ class SchedulerJob(BaseJob): # pylint:
disable=too-many-instance-attributes
task_instance_str = "\n\t".join(repr(x) for x in executable_tis)
self.log.info("Setting the following tasks to queued state:\n\t%s",
task_instance_str)
-
- # set TIs to queued state
- filter_for_tis = TI.filter_for_tis(executable_tis)
- session.query(TI).filter(filter_for_tis).update(
- # TODO[ha]: should we use func.now()? How does that work with DB
timezone on mysql when it's not
- # UTC?
- {TI.state: State.QUEUED, TI.queued_dttm: timezone.utcnow(),
TI.queued_by_job_id: self.id},
- synchronize_session=False,
- )
+ if len(executable_tis) > 0:
+ # set TIs to queued state
+ filter_for_tis = TI.filter_for_tis(executable_tis)
+ session.query(TI).filter(filter_for_tis).update(
+ # TODO[ha]: should we use func.now()? How does that work with
DB timezone
+ # on mysql when it's not UTC?
+ {TI.state: State.QUEUED, TI.queued_dttm: timezone.utcnow(),
TI.queued_by_job_id: self.id},
+ synchronize_session=False,
+ )
for ti in executable_tis:
make_transient(ti)
@@ -1580,14 +1581,24 @@ class SchedulerJob(BaseJob): # pylint:
disable=too-many-instance-attributes
# as DagModel.dag_id and DagModel.next_dagrun
# This list is used to verify if the DagRun already exist so that we
don't attempt to create
# duplicate dag runs
- active_dagruns = (
- session.query(DagRun.dag_id, DagRun.execution_date)
- .filter(
- tuple_(DagRun.dag_id, DagRun.execution_date).in_(
- [(dm.dag_id, dm.next_dagrun) for dm in dag_models]
- )
+
+ if session.bind.dialect.name == 'mssql':
+ active_dagruns_filter = or_(
+ *[
+ and_(
+ DagRun.dag_id == dm.dag_id,
+ DagRun.execution_date == dm.next_dagrun,
+ )
+ for dm in dag_models
+ ]
)
- .all()
+ else:
+ active_dagruns_filter = tuple_(DagRun.dag_id,
DagRun.execution_date).in_(
+ [(dm.dag_id, dm.next_dagrun) for dm in dag_models]
+ )
+
+ active_dagruns = (
+ session.query(DagRun.dag_id,
DagRun.execution_date).filter(active_dagruns_filter).all()
)
for dag_model in dag_models:
@@ -1644,7 +1655,7 @@ class SchedulerJob(BaseJob): # pylint:
disable=too-many-instance-attributes
.filter(
DagRun.dag_id.in_([o.dag_id for o in dag_models]),
DagRun.state == State.RUNNING, # pylint:
disable=comparison-with-callable
- DagRun.external_trigger.is_(False),
+ DagRun.external_trigger == expression.false(),
)
.group_by(DagRun.dag_id)
.all()
diff --git
a/airflow/migrations/versions/83f031fd9f1c_improve_mssql_compatibility.py
b/airflow/migrations/versions/83f031fd9f1c_improve_mssql_compatibility.py
new file mode 100644
index 0000000..ce5f3fc
--- /dev/null
+++ b/airflow/migrations/versions/83f031fd9f1c_improve_mssql_compatibility.py
@@ -0,0 +1,257 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""improve mssql compatibility
+
+Revision ID: 83f031fd9f1c
+Revises: a13f7613ad25
+Create Date: 2021-04-06 12:22:02.197726
+
+"""
+
+from collections import defaultdict
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy.dialects import mssql
+
+# revision identifiers, used by Alembic.
+revision = '83f031fd9f1c'
+down_revision = 'a13f7613ad25'
+branch_labels = None
+depends_on = None
+
+
+def is_table_empty(conn, table_name):
+ """
+ This function checks if the MS SQL table is empty
+
+ :param conn: SQL connection object
+ :param table_name: table name
+ :return: Booelan indicating if the table is present
+ """
+ return conn.execute(f'select TOP 1 * from {table_name}').first() is None
+
+
+def get_table_constraints(conn, table_name):
+ """
+ This function return primary and unique constraint
+ along with column name. some tables like task_instance
+ is missing primary key constraint name and the name is
+ auto-generated by sql server. so this function helps to
+ retrieve any primary or unique constraint name.
+
+ :param conn: sql connection object
+ :param table_name: table name
+ :return: a dictionary of ((constraint name, constraint type), column name)
of table
+ :rtype: defaultdict(list)
+ """
+ query = """SELECT tc.CONSTRAINT_NAME , tc.CONSTRAINT_TYPE, ccu.COLUMN_NAME
+ FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS tc
+ JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE AS ccu ON
ccu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME
+ WHERE tc.TABLE_NAME = '{table_name}' AND
+ (tc.CONSTRAINT_TYPE = 'PRIMARY KEY' or UPPER(tc.CONSTRAINT_TYPE) =
'UNIQUE')
+ """.format(
+ table_name=table_name
+ )
+ result = conn.execute(query).fetchall()
+ constraint_dict = defaultdict(list)
+ for constraint, constraint_type, column in result:
+ constraint_dict[(constraint, constraint_type)].append(column)
+ return constraint_dict
+
+
+def drop_column_constraints(operator, column_name, constraint_dict):
+ """
+ Drop a primary key or unique constraint
+
+ :param operator: batch_alter_table for the table
+ :param constraint_dict: a dictionary of ((constraint name, constraint
type), column name) of table
+ """
+ for constraint, columns in constraint_dict.items():
+ if column_name in columns:
+ if constraint[1].lower().startswith("primary"):
+ operator.drop_constraint(constraint[0], type_='primary')
+ elif constraint[1].lower().startswith("unique"):
+ operator.drop_constraint(constraint[0], type_='unique')
+
+
+def create_constraints(operator, column_name, constraint_dict):
+ """
+ Create a primary key or unique constraint
+
+ :param operator: batch_alter_table for the table
+ :param constraint_dict: a dictionary of ((constraint name, constraint
type), column name) of table
+ """
+ for constraint, columns in constraint_dict.items():
+ if column_name in columns:
+ if constraint[1].lower().startswith("primary"):
+ operator.create_primary_key(constraint_name=constraint[0],
columns=columns)
+ elif constraint[1].lower().startswith("unique"):
+
operator.create_unique_constraint(constraint_name=constraint[0],
columns=columns)
+
+
+def _use_date_time2(conn):
+ result = conn.execute(
+ """SELECT CASE WHEN CONVERT(VARCHAR(128), SERVERPROPERTY
('productversion'))
+ like '8%' THEN '2000' WHEN CONVERT(VARCHAR(128), SERVERPROPERTY
('productversion'))
+ like '9%' THEN '2005' ELSE '2005Plus' END AS MajorVersion"""
+ ).fetchone()
+ mssql_version = result[0]
+ return mssql_version not in ("2000", "2005")
+
+
+def _is_timestamp(conn, table_name, column_name):
+ query = f"""SELECT
+ TYPE_NAME(C.USER_TYPE_ID) AS DATA_TYPE
+ FROM SYS.COLUMNS C
+ JOIN SYS.TYPES T
+ ON C.USER_TYPE_ID=T.USER_TYPE_ID
+ WHERE C.OBJECT_ID=OBJECT_ID('{table_name}') and C.NAME='{column_name}';
+ """
+ column_type = conn.execute(query).fetchone()[0]
+ return column_type == "timestamp"
+
+
+def recreate_mssql_ts_column(conn, op, table_name, column_name):
+ """
+ Drop the timestamp column and recreate it as
+ datetime or datetime2(6)
+ """
+ if _is_timestamp(conn, table_name, column_name) and is_table_empty(conn,
table_name):
+ with op.batch_alter_table(table_name) as batch_op:
+ constraint_dict = get_table_constraints(conn, table_name)
+ drop_column_constraints(batch_op, column_name, constraint_dict)
+ batch_op.drop_column(column_name=column_name)
+ if _use_date_time2(conn):
+ batch_op.add_column(sa.Column(column_name,
mssql.DATETIME2(precision=6), nullable=False))
+ else:
+ batch_op.add_column(sa.Column(column_name, mssql.DATETIME,
nullable=False))
+ create_constraints(batch_op, column_name, constraint_dict)
+
+
+def alter_mssql_datetime_column(conn, op, table_name, column_name, nullable):
+ """Update the datetime column to datetime2(6)"""
+ if _use_date_time2(conn):
+ op.alter_column(
+ table_name=table_name,
+ column_name=column_name,
+ type_=mssql.DATETIME2(precision=6),
+ nullable=nullable,
+ )
+
+
+def alter_mssql_datetime2_column(conn, op, table_name, column_name, nullable):
+ """Update the datetime2(6) column to datetime"""
+ if _use_date_time2(conn):
+ op.alter_column(
+ table_name=table_name, column_name=column_name,
type_=mssql.DATETIME, nullable=nullable
+ )
+
+
+def _get_timestamp(conn):
+ if _use_date_time2(conn):
+ return mssql.DATETIME2(precision=6)
+ else:
+ return mssql.DATETIME
+
+
+def upgrade():
+ """Improve compatibility with MSSQL backend"""
+ conn = op.get_bind()
+ if conn.dialect.name != 'mssql':
+ return
+ recreate_mssql_ts_column(conn, op, 'dag_code', 'last_updated')
+ recreate_mssql_ts_column(conn, op, 'rendered_task_instance_fields',
'execution_date')
+ alter_mssql_datetime_column(conn, op, 'serialized_dag', 'last_updated',
False)
+ op.alter_column(table_name="xcom", column_name="timestamp",
type_=_get_timestamp(conn), nullable=False)
+ with op.batch_alter_table('task_reschedule') as task_reschedule_batch_op:
+ task_reschedule_batch_op.alter_column(
+ column_name='end_date', type_=_get_timestamp(conn), nullable=False
+ )
+ task_reschedule_batch_op.alter_column(
+ column_name='reschedule_date', type_=_get_timestamp(conn),
nullable=False
+ )
+ task_reschedule_batch_op.alter_column(
+ column_name='start_date', type_=_get_timestamp(conn),
nullable=False
+ )
+ with op.batch_alter_table('task_fail') as task_fail_batch_op:
+ task_fail_batch_op.drop_index('idx_task_fail_dag_task_date')
+ task_fail_batch_op.alter_column(
+ column_name="execution_date", type_=_get_timestamp(conn),
nullable=False
+ )
+ task_fail_batch_op.create_index(
+ 'idx_task_fail_dag_task_date', ['dag_id', 'task_id',
'execution_date'], unique=False
+ )
+ with op.batch_alter_table('task_instance') as task_instance_batch_op:
+ task_instance_batch_op.drop_index('ti_state_lkp')
+ task_instance_batch_op.create_index(
+ 'ti_state_lkp', ['dag_id', 'task_id', 'execution_date', 'state'],
unique=False
+ )
+ constraint_dict = get_table_constraints(conn, 'dag_run')
+ for constraint, columns in constraint_dict.items():
+ if 'dag_id' in columns:
+ if constraint[1].lower().startswith("unique"):
+ op.drop_constraint(constraint[0], 'dag_run', type_='unique')
+ # create filtered indexes
+ conn.execute(
+ """CREATE UNIQUE NONCLUSTERED INDEX idx_not_null_dag_id_execution_date
+ ON dag_run(dag_id,execution_date)
+ WHERE dag_id IS NOT NULL and execution_date is not null"""
+ )
+ conn.execute(
+ """CREATE UNIQUE NONCLUSTERED INDEX idx_not_null_dag_id_run_id
+ ON dag_run(dag_id,run_id)
+ WHERE dag_id IS NOT NULL and run_id is not null"""
+ )
+
+
+def downgrade():
+ """Reverse MSSQL backend compatibility improvements"""
+ conn = op.get_bind()
+ if conn.dialect.name != 'mssql':
+ return
+ alter_mssql_datetime2_column(conn, op, 'serialized_dag', 'last_updated',
False)
+ op.alter_column(table_name="xcom", column_name="timestamp",
type_=_get_timestamp(conn), nullable=True)
+ with op.batch_alter_table('task_reschedule') as task_reschedule_batch_op:
+ task_reschedule_batch_op.alter_column(
+ column_name='end_date', type_=_get_timestamp(conn), nullable=True
+ )
+ task_reschedule_batch_op.alter_column(
+ column_name='reschedule_date', type_=_get_timestamp(conn),
nullable=True
+ )
+ task_reschedule_batch_op.alter_column(
+ column_name='start_date', type_=_get_timestamp(conn), nullable=True
+ )
+ with op.batch_alter_table('task_fail') as task_fail_batch_op:
+ task_fail_batch_op.drop_index('idx_task_fail_dag_task_date')
+ task_fail_batch_op.alter_column(
+ column_name="execution_date", type_=_get_timestamp(conn),
nullable=False
+ )
+ task_fail_batch_op.create_index(
+ 'idx_task_fail_dag_task_date', ['dag_id', 'task_id',
'execution_date'], unique=False
+ )
+ with op.batch_alter_table('task_instance') as task_instance_batch_op:
+ task_instance_batch_op.drop_index('ti_state_lkp')
+ task_instance_batch_op.create_index(
+ 'ti_state_lkp', ['dag_id', 'task_id', 'execution_date'],
unique=False
+ )
+ op.create_unique_constraint('UQ__dag_run__dag_id_run_id', 'dag_run',
['dag_id', 'run_id'])
+ op.create_unique_constraint('UQ__dag_run__dag_id_execution_date',
'dag_run', ['dag_id', 'execution_date'])
+ op.drop_index('idx_not_null_dag_id_execution_date', table_name='dag_run')
+ op.drop_index('idx_not_null_dag_id_run_id', table_name='dag_run')
diff --git
a/airflow/migrations/versions/8d48763f6d53_add_unique_constraint_to_conn_id.py
b/airflow/migrations/versions/8d48763f6d53_add_unique_constraint_to_conn_id.py
index 2c743e4..44be988 100644
---
a/airflow/migrations/versions/8d48763f6d53_add_unique_constraint_to_conn_id.py
+++
b/airflow/migrations/versions/8d48763f6d53_add_unique_constraint_to_conn_id.py
@@ -38,9 +38,9 @@ def upgrade():
"""Apply add unique constraint to conn_id and set it as non-nullable"""
try:
with op.batch_alter_table('connection') as batch_op:
+ batch_op.alter_column("conn_id", nullable=False,
existing_type=sa.String(250))
batch_op.create_unique_constraint(constraint_name="unique_conn_id",
columns=["conn_id"])
- batch_op.alter_column("conn_id", nullable=False,
existing_type=sa.String(250))
except sa.exc.IntegrityError:
raise Exception("Make sure there are no duplicate connections with the
same conn_id or null values")
diff --git
a/airflow/migrations/versions/98271e7606e2_add_scheduling_decision_to_dagrun_and_.py
b/airflow/migrations/versions/98271e7606e2_add_scheduling_decision_to_dagrun_and_.py
index 8019aa2..d220d32 100644
---
a/airflow/migrations/versions/98271e7606e2_add_scheduling_decision_to_dagrun_and_.py
+++
b/airflow/migrations/versions/98271e7606e2_add_scheduling_decision_to_dagrun_and_.py
@@ -26,7 +26,7 @@ Create Date: 2020-10-01 12:13:32.968148
import sqlalchemy as sa
from alembic import op
-from sqlalchemy.dialects import mysql
+from sqlalchemy.dialects import mssql, mysql
# revision identifiers, used by Alembic.
revision = '98271e7606e2'
@@ -35,12 +35,32 @@ branch_labels = None
depends_on = None
+def _use_date_time2(conn):
+ result = conn.execute(
+ """SELECT CASE WHEN CONVERT(VARCHAR(128), SERVERPROPERTY
('productversion'))
+ like '8%' THEN '2000' WHEN CONVERT(VARCHAR(128), SERVERPROPERTY
('productversion'))
+ like '9%' THEN '2005' ELSE '2005Plus' END AS MajorVersion"""
+ ).fetchone()
+ mssql_version = result[0]
+ return mssql_version not in ("2000", "2005")
+
+
+def _get_timestamp(conn):
+ dialect_name = conn.dialect.name
+ if dialect_name == "mssql":
+ return mssql.DATETIME2(precision=6) if _use_date_time2(conn) else
mssql.DATETIME
+ elif dialect_name != "mysql":
+ return sa.TIMESTAMP(timezone=True)
+ else:
+ return mysql.TIMESTAMP(fsp=6, timezone=True)
+
+
def upgrade():
"""Apply Add scheduling_decision to DagRun and DAG"""
conn = op.get_bind() # pylint: disable=no-member
- is_mysql = bool(conn.dialect.name == "mysql")
is_sqlite = bool(conn.dialect.name == "sqlite")
- timestamp = sa.TIMESTAMP(timezone=True) if not is_mysql else
mysql.TIMESTAMP(fsp=6, timezone=True)
+ is_mssql = bool(conn.dialect.name == "mssql")
+ timestamp = _get_timestamp(conn)
if is_sqlite:
op.execute("PRAGMA foreign_keys=off")
@@ -71,7 +91,7 @@ def upgrade():
op.execute(
"UPDATE dag SET concurrency={}, has_task_concurrency_limits={} where
concurrency IS NULL".format(
- concurrency, 1 if is_sqlite else sa.true()
+ concurrency, 1 if is_sqlite or is_mssql else sa.true()
)
)
diff --git
a/airflow/migrations/versions/bbf4a7ad0465_remove_id_column_from_xcom.py
b/airflow/migrations/versions/bbf4a7ad0465_remove_id_column_from_xcom.py
index f063aea..045f5a6 100644
--- a/airflow/migrations/versions/bbf4a7ad0465_remove_id_column_from_xcom.py
+++ b/airflow/migrations/versions/bbf4a7ad0465_remove_id_column_from_xcom.py
@@ -24,6 +24,8 @@ Create Date: 2019-10-29 13:53:09.445943
"""
+from collections import defaultdict
+
from alembic import op
from sqlalchemy import Column, Integer
from sqlalchemy.engine.reflection import Inspector
@@ -35,6 +37,64 @@ branch_labels = None
depends_on = None
+def get_table_constraints(conn, table_name):
+ """
+ This function return primary and unique constraint
+ along with column name. Some tables like `task_instance`
+ is missing the primary key constraint name and the name is
+ auto-generated by the SQL server. so this function helps to
+ retrieve any primary or unique constraint name.
+
+ :param conn: sql connection object
+ :param table_name: table name
+ :return: a dictionary of ((constraint name, constraint type), column name)
of table
+ :rtype: defaultdict(list)
+ """
+ query = """SELECT tc.CONSTRAINT_NAME , tc.CONSTRAINT_TYPE, ccu.COLUMN_NAME
+ FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS tc
+ JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE AS ccu ON
ccu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME
+ WHERE tc.TABLE_NAME = '{table_name}' AND
+ (tc.CONSTRAINT_TYPE = 'PRIMARY KEY' or UPPER(tc.CONSTRAINT_TYPE) =
'UNIQUE')
+ """.format(
+ table_name=table_name
+ )
+ result = conn.execute(query).fetchall()
+ constraint_dict = defaultdict(list)
+ for constraint, constraint_type, column in result:
+ constraint_dict[(constraint, constraint_type)].append(column)
+ return constraint_dict
+
+
+def drop_column_constraints(operator, column_name, constraint_dict):
+ """
+ Drop a primary key or unique constraint
+
+ :param operator: batch_alter_table for the table
+ :param constraint_dict: a dictionary of ((constraint name, constraint
type), column name) of table
+ """
+ for constraint, columns in constraint_dict.items():
+ if column_name in columns:
+ if constraint[1].lower().startswith("primary"):
+ operator.drop_constraint(constraint[0], type_='primary')
+ elif constraint[1].lower().startswith("unique"):
+ operator.drop_constraint(constraint[0], type_='unique')
+
+
+def create_constraints(operator, column_name, constraint_dict):
+ """
+ Create a primary key or unique constraint
+
+ :param operator: batch_alter_table for the table
+ :param constraint_dict: a dictionary of ((constraint name, constraint
type), column name) of table
+ """
+ for constraint, columns in constraint_dict.items():
+ if column_name in columns:
+ if constraint[1].lower().startswith("primary"):
+ operator.create_primary_key(constraint_name=constraint[0],
columns=columns)
+ elif constraint[1].lower().startswith("unique"):
+
operator.create_unique_constraint(constraint_name=constraint[0],
columns=columns)
+
+
def upgrade():
"""Apply Remove id column from xcom"""
conn = op.get_bind()
@@ -43,9 +103,14 @@ def upgrade():
with op.batch_alter_table('xcom') as bop:
xcom_columns = [col.get('name') for col in
inspector.get_columns("xcom")]
if "id" in xcom_columns:
+ if conn.dialect.name == 'mssql':
+ constraint_dict = get_table_constraints(conn, "xcom")
+ drop_column_constraints(bop, 'id', constraint_dict)
bop.drop_column('id')
bop.drop_index('idx_xcom_dag_task_date')
- bop.create_primary_key('pk_xcom', ['dag_id', 'task_id', 'key',
'execution_date'])
+ # mssql doesn't allow primary keys with nullable columns
+ if conn.dialect.name != 'mssql':
+ bop.create_primary_key('pk_xcom', ['dag_id', 'task_id', 'key',
'execution_date'])
def downgrade():
diff --git
a/airflow/migrations/versions/e9304a3141f0_make_xcom_pkey_columns_non_nullable.py
b/airflow/migrations/versions/e9304a3141f0_make_xcom_pkey_columns_non_nullable.py
new file mode 100644
index 0000000..bde065b
--- /dev/null
+++
b/airflow/migrations/versions/e9304a3141f0_make_xcom_pkey_columns_non_nullable.py
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""make xcom pkey columns non-nullable
+
+Revision ID: e9304a3141f0
+Revises: 83f031fd9f1c
+Create Date: 2021-04-06 13:22:02.197726
+
+"""
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy.dialects import mssql, mysql
+
+from airflow.models.base import COLLATION_ARGS
+
+# revision identifiers, used by Alembic.
+revision = 'e9304a3141f0'
+down_revision = '83f031fd9f1c'
+branch_labels = None
+depends_on = None
+
+
+def _use_date_time2(conn):
+ result = conn.execute(
+ """SELECT CASE WHEN CONVERT(VARCHAR(128), SERVERPROPERTY
('productversion'))
+ like '8%' THEN '2000' WHEN CONVERT(VARCHAR(128), SERVERPROPERTY
('productversion'))
+ like '9%' THEN '2005' ELSE '2005Plus' END AS MajorVersion"""
+ ).fetchone()
+ mssql_version = result[0]
+ return mssql_version not in ("2000", "2005")
+
+
+def _get_timestamp(conn):
+ dialect_name = conn.dialect.name
+ if dialect_name == "mssql":
+ return mssql.DATETIME2(precision=6) if _use_date_time2(conn) else
mssql.DATETIME
+ elif dialect_name == "mysql":
+ return mysql.TIMESTAMP(fsp=6, timezone=True)
+ else:
+ return sa.TIMESTAMP(timezone=True)
+
+
+def upgrade():
+ """Apply make xcom pkey columns non-nullable"""
+ conn = op.get_bind()
+ with op.batch_alter_table('xcom') as bop:
+ bop.alter_column("key", type_=sa.String(length=512, **COLLATION_ARGS),
nullable=False)
+ bop.alter_column("execution_date", type_=_get_timestamp(conn),
nullable=False)
+ if conn.dialect.name == 'mssql':
+ bop.create_primary_key('pk_xcom', ['dag_id', 'task_id', 'key',
'execution_date'])
+
+
+def downgrade():
+ """Unapply make xcom pkey columns non-nullable"""
+ conn = op.get_bind()
+ with op.batch_alter_table('xcom') as bop:
+ if conn.dialect.name == 'mssql':
+ bop.drop_constraint('pk_xcom', 'primary')
+ bop.alter_column("key", type_=sa.String(length=512, **COLLATION_ARGS),
nullable=True)
+ bop.alter_column("execution_date", type_=_get_timestamp(conn),
nullable=True)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 9f929bf..1cf3797 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -52,6 +52,7 @@ from jinja2.nativetypes import NativeEnvironment
from sqlalchemy import Boolean, Column, ForeignKey, Index, Integer, String,
Text, func, or_
from sqlalchemy.orm import backref, joinedload, relationship
from sqlalchemy.orm.session import Session
+from sqlalchemy.sql import expression
import airflow.templates
from airflow import settings, utils
@@ -100,7 +101,7 @@ def get_last_dagrun(dag_id, session,
include_externally_triggered=False):
DR = DagRun
query = session.query(DR).filter(DR.dag_id == dag_id)
if not include_externally_triggered:
- query = query.filter(DR.external_trigger == False) # noqa pylint:
disable=singleton-comparison
+ query = query.filter(DR.external_trigger == expression.false())
query = query.order_by(DR.execution_date.desc())
return query.first()
@@ -897,7 +898,9 @@ class DAG(LoggingMixin):
)
if external_trigger is not None:
- query = query.filter(DagRun.external_trigger == external_trigger)
+ query = query.filter(
+ DagRun.external_trigger == (expression.true() if
external_trigger else expression.false())
+ )
return query.scalar()
@@ -1872,7 +1875,7 @@ class DAG(LoggingMixin):
.filter(
DagRun.dag_id.in_(existing_dag_ids),
DagRun.state == State.RUNNING, # pylint:
disable=comparison-with-callable
- DagRun.external_trigger.is_(False),
+ DagRun.external_trigger == expression.false(),
)
.group_by(DagRun.dag_id)
.all()
@@ -2186,7 +2189,7 @@ class DagModel(Base):
"""
paused_dag_ids = (
session.query(DagModel.dag_id)
- .filter(DagModel.is_paused.is_(True))
+ .filter(DagModel.is_paused == expression.true())
.filter(DagModel.dag_id.in_(dag_ids))
.all()
)
@@ -2270,8 +2273,8 @@ class DagModel(Base):
query = (
session.query(cls)
.filter(
- cls.is_paused.is_(False),
- cls.is_active.is_(True),
+ cls.is_paused == expression.false(),
+ cls.is_active == expression.true(),
cls.next_dagrun_create_after <= func.now(),
)
.order_by(cls.next_dagrun_create_after)
diff --git a/airflow/models/dagcode.py b/airflow/models/dagcode.py
index ce65fc2..4295a5b 100644
--- a/airflow/models/dagcode.py
+++ b/airflow/models/dagcode.py
@@ -20,7 +20,8 @@ import struct
from datetime import datetime
from typing import Iterable, List, Optional
-from sqlalchemy import BigInteger, Column, String, Text, exists
+from sqlalchemy import BigInteger, Column, String, Text
+from sqlalchemy.sql.expression import literal
from airflow.exceptions import AirflowException, DagCodeNotFound
from airflow.models.base import Base
@@ -147,7 +148,7 @@ class DagCode(Base):
:param session: ORM Session
"""
fileloc_hash = cls.dag_fileloc_hash(fileloc)
- return session.query(exists().where(cls.fileloc_hash ==
fileloc_hash)).scalar()
+ return session.query(literal(True)).filter(cls.fileloc_hash ==
fileloc_hash).one_or_none() is not None
@classmethod
def get_code_by_fileloc(cls, fileloc: str) -> str:
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 6bd96f3..e18351d 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -35,6 +35,7 @@ from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import backref, relationship, synonym
from sqlalchemy.orm.session import Session
+from sqlalchemy.sql import expression
from airflow import settings
from airflow.configuration import conf as airflow_conf
@@ -211,8 +212,8 @@ class DagRun(Base, LoggingMixin):
DagModel.dag_id == cls.dag_id,
)
.filter(
- DagModel.is_paused.is_(False),
- DagModel.is_active.is_(True),
+ DagModel.is_paused == expression.false(),
+ DagModel.is_active == expression.true(),
)
.order_by(
nulls_first(cls.last_scheduling_decision, session=session),
diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py
index 81448b6..d0db6e8 100644
--- a/airflow/models/serialized_dag.py
+++ b/airflow/models/serialized_dag.py
@@ -26,8 +26,7 @@ from typing import Any, Dict, List, Optional
import sqlalchemy_jsonfield
from sqlalchemy import BigInteger, Column, Index, String, and_
from sqlalchemy.orm import Session, backref, foreign, relationship
-from sqlalchemy.sql import exists
-from sqlalchemy.sql.expression import func
+from sqlalchemy.sql.expression import func, literal
from airflow.models.base import ID_LEN, Base
from airflow.models.dag import DAG, DagModel
@@ -117,14 +116,20 @@ class SerializedDagModel(Base):
# If Yes, does nothing
# If No or the DAG does not exists, updates / writes Serialized DAG to
DB
if min_update_interval is not None:
- if session.query(
- exists().where(
+ if (
+ session.query(literal(True))
+ .filter(
and_(
cls.dag_id == dag.dag_id,
(timezone.utcnow() -
timedelta(seconds=min_update_interval)) < cls.last_updated,
)
)
- ).scalar():
+ .first()
+ is not None
+ ):
+ # TODO: .first() is not None can be changed to .scalar() once
we update to sqlalchemy 1.4+
+ # as the associated sqlalchemy bug for MySQL was fixed
+ # related issue :
https://github.com/sqlalchemy/sqlalchemy/issues/5481
return False
log.debug("Checking if DAG (%s) changed", dag.dag_id)
@@ -217,7 +222,7 @@ class SerializedDagModel(Base):
:param dag_id: the DAG to check
:param session: ORM Session
"""
- return session.query(exists().where(cls.dag_id == dag_id)).scalar()
+ return session.query(literal(True)).filter(cls.dag_id ==
dag_id).first() is not None
@classmethod
@provide_session
@@ -313,7 +318,9 @@ class SerializedDagModel(Base):
if session.bind.dialect.name in ["sqlite", "mysql"]:
for row in session.query(cls.dag_id, func.json_extract(cls.data,
"$.dag.dag_dependencies")).all():
dependencies[row[0]] = [DagDependency(**d) for d in
json.loads(row[1])]
-
+ elif session.bind.dialect.name == "mssql":
+ for row in session.query(cls.dag_id, func.json_query(cls.data,
"$.dag.dag_dependencies")).all():
+ dependencies[row[0]] = [DagDependency(**d) for d in
json.loads(row[1])]
else:
for row in session.query(
cls.dag_id, func.json_extract_path(cls.data, "dag",
"dag_dependencies")
diff --git a/airflow/sensors/smart_sensor.py b/airflow/sensors/smart_sensor.py
index 6c17beb..9d0a28c 100644
--- a/airflow/sensors/smart_sensor.py
+++ b/airflow/sensors/smart_sensor.py
@@ -391,23 +391,34 @@ class SmartSensorOperator(BaseOperator, SkipMixin):
:param session: The sqlalchemy session.
"""
TI = TaskInstance
- ti_keys = [(x.dag_id, x.task_id, x.execution_date) for x in
sensor_works]
- def update_ti_hostname_with_count(count, ti_keys):
+ def update_ti_hostname_with_count(count, sensor_works):
# Using or_ instead of in_ here to prevent from full table scan.
- tis = (
- session.query(TI)
- .filter(or_(tuple_(TI.dag_id, TI.task_id, TI.execution_date)
== ti_key for ti_key in ti_keys))
- .all()
- )
+ if session.bind.dialect.name == 'mssql':
+ ti_filter = or_(
+ and_(
+ TI.dag_id == ti_key.dag_id,
+ TI.task_id == ti_key.task_id,
+ TI.execution_date == ti_key.execution_date,
+ )
+ for ti_key in sensor_works
+ )
+ else:
+ ti_keys = [(x.dag_id, x.task_id, x.execution_date) for x in
sensor_works]
+ ti_filter = or_(
+ tuple_(TI.dag_id, TI.task_id, TI.execution_date) == ti_key
for ti_key in ti_keys
+ )
+ tis = session.query(TI).filter(ti_filter).all()
for ti in tis:
ti.hostname = self.hostname
session.commit()
- return count + len(ti_keys)
+ return count + len(sensor_works)
- count = helpers.reduce_in_chunks(update_ti_hostname_with_count,
ti_keys, 0, self.max_tis_per_query)
+ count = helpers.reduce_in_chunks(
+ update_ti_hostname_with_count, sensor_works, 0,
self.max_tis_per_query
+ )
if count:
self.log.info("Updated hostname on %s tis.", count)
diff --git a/airflow/www/security.py b/airflow/www/security.py
index 8768587..9b391f7 100644
--- a/airflow/www/security.py
+++ b/airflow/www/security.py
@@ -378,6 +378,14 @@ class AirflowSecurityManager(SecurityManager,
LoggingMixin): # pylint: disable=
return True
return resource_name.startswith(permissions.RESOURCE_DAG_PREFIX)
+ def _has_view_access(self, user, action, resource) -> bool:
+ """
+ Overriding the method to ensure that it always returns a bool
+ _has_view_access can return NoneType which gives us
+ issues later on, this fixes that.
+ """
+ return bool(super()._has_view_access(user, action, resource))
+
def has_access(self, permission, resource, user=None) -> bool:
"""
Verify whether a given user could perform certain permission
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 78dca2f..e90ec6b 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -76,7 +76,7 @@ from jinja2.utils import htmlsafe_json_dumps, pformat #
type: ignore
from pendulum.datetime import DateTime
from pygments import highlight, lexers
from pygments.formatters import HtmlFormatter # noqa pylint:
disable=no-name-in-module
-from sqlalchemy import and_, desc, func, or_, union_all
+from sqlalchemy import Date, and_, desc, func, or_, union_all
from sqlalchemy.orm import joinedload
from wtforms import SelectField, validators
from wtforms.validators import InputRequired
@@ -2090,6 +2090,14 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint:
disable=too-many-public-m
@action_logging # pylint: disable=too-many-locals
def calendar(self):
"""Get DAG runs as calendar"""
+
+ def _convert_to_date(session, column):
+ """Convert column to date."""
+ if session.bind.dialect.name == 'mssql':
+ return column.cast(Date)
+ else:
+ return func.date(column)
+
dag_id = request.args.get('dag_id')
dag = current_app.dag_bag.get_dag(dag_id)
if not dag:
@@ -2103,13 +2111,13 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint:
disable=too-many-public-m
with create_session() as session:
dag_states = (
session.query(
- func.date(DagRun.execution_date).label('date'),
+ (_convert_to_date(session,
DagRun.execution_date)).label('date'),
DagRun.state,
func.count('*').label('count'),
)
.filter(DagRun.dag_id == dag.dag_id)
- .group_by(func.date(DagRun.execution_date), DagRun.state)
- .order_by(func.date(DagRun.execution_date).asc())
+ .group_by(_convert_to_date(session, DagRun.execution_date),
DagRun.state)
+ .order_by(_convert_to_date(session,
DagRun.execution_date).asc())
.all()
)
diff --git a/breeze b/breeze
index 50f9e55..a838d65 100755
--- a/breeze
+++ b/breeze
@@ -3069,6 +3069,9 @@ function breeze::read_saved_environment_variables() {
MYSQL_VERSION="${MYSQL_VERSION:=$(parameters::read_from_file
MYSQL_VERSION)}"
MYSQL_VERSION=${MYSQL_VERSION:=${_breeze_default_mysql_version}}
+ MSSQL_VERSION="${MSSQL_VERSION:=$(parameters::read_from_file
MSSQL_VERSION)}"
+ MSSQL_VERSION=${MSSQL_VERSION:=${_breeze_default_mssql_version}}
+
# Here you read DockerHub user/account that you use
# You can populate your own images in DockerHub this way and work with the,
# You can override it with "--dockerhub-user" option and it will be stored
in .build directory
@@ -3102,6 +3105,7 @@ function breeze::read_saved_environment_variables() {
# EXECUTOR
# POSTGRES_VERSION
# MYSQL_VERSION
+# MSSQL_VERSION
# DOCKERHUB_USER
# DOCKERHUB_REPO
#
@@ -3135,6 +3139,7 @@ function breeze::check_and_save_all_params() {
parameters::check_and_save_allowed_param "EXECUTOR" "Executors"
"--executor"
parameters::check_and_save_allowed_param "POSTGRES_VERSION" "Postgres
version" "--postgres-version"
parameters::check_and_save_allowed_param "MYSQL_VERSION" "Mysql version"
"--mysql-version"
+ parameters::check_and_save_allowed_param "MSSQL_VERSION" "MSSql version"
"--mssql-version"
parameters::check_and_save_allowed_param "GITHUB_REGISTRY" "GitHub
Registry" "--github-registry"
parameters::check_allowed_param TEST_TYPE "Type of tests" "--test-type"
@@ -3156,6 +3161,7 @@ function breeze::check_and_save_all_params() {
# WEBSERVER_HOST_PORT
# POSTGRES_HOST_PORT
# MYSQL_HOST_PORT
+# MSSQL_HOST_PORT
#
#######################################################################################################
function breeze::print_cheatsheet() {
@@ -3188,6 +3194,7 @@ function breeze::print_cheatsheet() {
echo " * ${FLOWER_HOST_PORT} -> forwarded to Flower dashboard ->
airflow:5555"
echo " * ${POSTGRES_HOST_PORT} -> forwarded to Postgres database
-> postgres:5432"
echo " * ${MYSQL_HOST_PORT} -> forwarded to MySQL database ->
mysql:3306"
+ echo " * ${MSSQL_HOST_PORT} -> forwarded to MSSQL database ->
mssql:1443"
echo " * ${REDIS_HOST_PORT} -> forwarded to Redis broker ->
redis:6379"
echo
echo " Here are links to those services that you can use on host:"
diff --git a/breeze-complete b/breeze-complete
index 6126ca4..b0985eb 100644
--- a/breeze-complete
+++ b/breeze-complete
@@ -24,7 +24,7 @@
# Those cannot be made read-only as the breeze-complete must be re-sourceable
_breeze_allowed_python_major_minor_versions="3.6 3.7 3.8"
-_breeze_allowed_backends="sqlite mysql postgres"
+_breeze_allowed_backends="sqlite mysql postgres mssql"
_breeze_allowed_integrations="cassandra kerberos mongo openldap pinot rabbitmq
redis statsd trino all"
_breeze_allowed_generate_constraints_modes="source-providers pypi-providers
no-providers"
# registrys is good here even if it is not correct english. We are adding s
automatically to all variables
@@ -34,6 +34,7 @@ _breeze_allowed_kubernetes_versions="v1.20.2 v1.19.7 v1.18.15"
_breeze_allowed_helm_versions="v3.2.4"
_breeze_allowed_kind_versions="v0.10.0"
_breeze_allowed_mysql_versions="5.7 8"
+_breeze_allowed_mssql_versions="2017-latest 2019-latest"
_breeze_allowed_postgres_versions="9.6 10 11 12 13"
_breeze_allowed_kind_operations="start stop restart status deploy test shell
k9s"
_breeze_allowed_executors="KubernetesExecutor CeleryExecutor LocalExecutor
CeleryKubernetesExecutor"
@@ -54,6 +55,7 @@ _breeze_allowed_installation_methods=". apache-airflow"
_breeze_default_executor=$(echo "${_breeze_allowed_executors}" | awk
'{print $1}')
_breeze_default_postgres_version=$(echo
"${_breeze_allowed_postgres_versions}" | awk '{print $1}')
_breeze_default_mysql_version=$(echo "${_breeze_allowed_mysql_versions}" |
awk '{print $1}')
+ _breeze_default_mssql_version=$(echo "${_breeze_allowed_mssql_versions}" |
awk '{print $1}')
_breeze_default_test_type=$(echo "${_breeze_allowed_test_types}" | awk
'{print $1}')
_breeze_default_package_format=$(echo "${_breeze_allowed_package_formats}"
| awk '{print $1}')
}
@@ -175,7 +177,7 @@ verbose assume-yes assume-no assume-quit
forward-credentials init-script:
force-build-images force-pull-images force-pull-base-python-image
production-image extras: force-clean-images skip-rebuild-check
build-cache-local build-cache-pulled build-cache-disabled disable-pip-cache
dockerhub-user: dockerhub-repo: use-github-registry github-registry:
github-repository: github-image-id: generate-constraints-mode:
-postgres-version: mysql-version:
+postgres-version: mysql-version: mssql-version:
version-suffix-for-pypi: version-suffix-for-svn:
additional-extras: additional-python-deps: additional-dev-deps:
additional-runtime-deps: image-tag:
disable-mysql-client-installation constraints-location: disable-pip-cache
install-from-docker-context-files
@@ -287,6 +289,9 @@ function breeze_complete::get_known_values_breeze() {
--mysql-version)
_breeze_known_values=${_breeze_allowed_mysql_versions}
;;
+ --mssql-version)
+ _breeze_known_values=${_breeze_allowed_mssql_versions}
+ ;;
-D | --dockerhub-user)
_breeze_known_values="${_breeze_default_dockerhub_user}"
;;
diff --git a/docs/apache-airflow/installation.rst
b/docs/apache-airflow/installation.rst
index e8fb3e7..ace814e 100644
--- a/docs/apache-airflow/installation.rst
+++ b/docs/apache-airflow/installation.rst
@@ -65,6 +65,7 @@ Airflow is tested with:
* PostgreSQL: 9.6, 10, 11, 12, 13
* MySQL: 5.7, 8
* SQLite: 3.15.0+
+ * MSSQL(Experimental): 2017, 2019
* Kubernetes: 1.18.15 1.19.7 1.20.2
diff --git a/scripts/ci/docker-compose/base.yml
b/scripts/ci/docker-compose/backend-mssql-port.yml
similarity index 55%
copy from scripts/ci/docker-compose/base.yml
copy to scripts/ci/docker-compose/backend-mssql-port.yml
index eab6425..42fd418 100644
--- a/scripts/ci/docker-compose/base.yml
+++ b/scripts/ci/docker-compose/backend-mssql-port.yml
@@ -17,24 +17,6 @@
---
version: "2.2"
services:
- airflow:
- image: ${AIRFLOW_IMAGE}
- environment:
- - USER=root
- - ADDITIONAL_PATH=~/.local/bin
- -
CELERY_BROKER_URLS=amqp://guest:guest@rabbitmq:5672,redis://redis:6379/0
- - KUBECONFIG=/files/.kube/config
- - HOST_HOME=${HOME}
- env_file:
- - _docker.env
- volumes:
- # Pass docker to inside of the container so that Kind and Moto tests can
use it.
- - /var/run/docker.sock:/var/run/docker.sock
- - /dev/urandom:/dev/random # Required to get non-blocking entropy
source
+ mssql:
ports:
- - "${WEBSERVER_HOST_PORT}:8080"
- - "${FLOWER_HOST_PORT}:5555"
-volumes:
- sqlite-db-volume:
- postgres-db-volume:
- mysql-db-volume:
+ - "${MSSQL_HOST_PORT}:1433"
diff --git a/scripts/ci/docker-compose/base.yml
b/scripts/ci/docker-compose/backend-mssql.yml
similarity index 57%
copy from scripts/ci/docker-compose/base.yml
copy to scripts/ci/docker-compose/backend-mssql.yml
index eab6425..93d2da1 100644
--- a/scripts/ci/docker-compose/base.yml
+++ b/scripts/ci/docker-compose/backend-mssql.yml
@@ -18,23 +18,16 @@
version: "2.2"
services:
airflow:
- image: ${AIRFLOW_IMAGE}
environment:
- - USER=root
- - ADDITIONAL_PATH=~/.local/bin
- -
CELERY_BROKER_URLS=amqp://guest:guest@rabbitmq:5672,redis://redis:6379/0
- - KUBECONFIG=/files/.kube/config
- - HOST_HOME=${HOME}
- env_file:
- - _docker.env
+ - BACKEND=mssql
+ -
AIRFLOW__CORE__SQL_ALCHEMY_CONN=mssql+pyodbc://sa:Airflow123@mssql:1433/master?driver=ODBC+Driver+17+for+SQL+Server
+ -
AIRFLOW__CELERY__RESULT_BACKEND=db+mssql+pyodbc://sa:Airflow123@mssql:1433/master?driver=ODBC+Driver+17+for+SQL+Server
+ depends_on:
+ - mssql
+ mssql:
+ image: mcr.microsoft.com/mssql/server:${MSSQL_VERSION}
+ environment:
+ - ACCEPT_EULA=Y
+ - SA_PASSWORD=Airflow123
volumes:
- # Pass docker to inside of the container so that Kind and Moto tests can
use it.
- - /var/run/docker.sock:/var/run/docker.sock
- - /dev/urandom:/dev/random # Required to get non-blocking entropy
source
- ports:
- - "${WEBSERVER_HOST_PORT}:8080"
- - "${FLOWER_HOST_PORT}:5555"
-volumes:
- sqlite-db-volume:
- postgres-db-volume:
- mysql-db-volume:
+ - mssql-db-volume:/var/opt/mssql
diff --git a/scripts/ci/docker-compose/base.yml
b/scripts/ci/docker-compose/base.yml
index eab6425..a3d1842 100644
--- a/scripts/ci/docker-compose/base.yml
+++ b/scripts/ci/docker-compose/base.yml
@@ -38,3 +38,4 @@ volumes:
sqlite-db-volume:
postgres-db-volume:
mysql-db-volume:
+ mssql-db-volume:
diff --git a/scripts/ci/libraries/_initialization.sh
b/scripts/ci/libraries/_initialization.sh
index c734ccd..e824963 100644
--- a/scripts/ci/libraries/_initialization.sh
+++ b/scripts/ci/libraries/_initialization.sh
@@ -23,6 +23,7 @@ CURRENT_KUBERNETES_VERSIONS=()
CURRENT_KUBERNETES_MODES=()
CURRENT_POSTGRES_VERSIONS=()
CURRENT_MYSQL_VERSIONS=()
+CURRENT_MSSQL_VERSIONS=()
CURRENT_KIND_VERSIONS=()
CURRENT_HELM_VERSIONS=()
CURRENT_EXECUTOR=()
@@ -74,6 +75,7 @@ function initialization::initialize_base_variables() {
export WEBSERVER_HOST_PORT=${WEBSERVER_HOST_PORT:="28080"}
export POSTGRES_HOST_PORT=${POSTGRES_HOST_PORT:="25433"}
export MYSQL_HOST_PORT=${MYSQL_HOST_PORT:="23306"}
+ export MSSQL_HOST_PORT=${MSSQL_HOST_PORT:="21433"}
export FLOWER_HOST_PORT=${FLOWER_HOST_PORT:="25555"}
export REDIS_HOST_PORT=${REDIS_HOST_PORT:="26379"}
@@ -103,6 +105,10 @@ function initialization::initialize_base_variables() {
CURRENT_MYSQL_VERSIONS+=("5.7" "8")
export CURRENT_MYSQL_VERSIONS
+ # Currently supported versions of MSSQL
+ CURRENT_MSSQL_VERSIONS+=("2017-latest" "2019-latest")
+ export CURRENT_MSSQL_VERSIONS
+
BACKEND=${BACKEND:="sqlite"}
export BACKEND
@@ -112,6 +118,9 @@ function initialization::initialize_base_variables() {
# Default MySQL versions
export MYSQL_VERSION=${MYSQL_VERSION:=${CURRENT_MYSQL_VERSIONS[0]}}
+ #Default MS SQL version
+ export MSSQL_VERSION=${MSSQL_VERSION:=${CURRENT_MSSQL_VERSIONS[0]}}
+
# If set to true, the database will be reset at entry. Works for Postgres
and MySQL
export DB_RESET=${DB_RESET:="false"}
@@ -897,6 +906,7 @@ function initialization::make_constants_read_only() {
readonly CURRENT_KUBERNETES_MODES
readonly CURRENT_POSTGRES_VERSIONS
readonly CURRENT_MYSQL_VERSIONS
+ readonly CURRENT_MSSQL_VERSIONS
readonly CURRENT_KIND_VERSIONS
readonly CURRENT_HELM_VERSIONS
readonly CURRENT_EXECUTOR
diff --git a/scripts/ci/selective_ci_checks.sh
b/scripts/ci/selective_ci_checks.sh
index 7e77edd..9a115a2 100755
--- a/scripts/ci/selective_ci_checks.sh
+++ b/scripts/ci/selective_ci_checks.sh
@@ -103,9 +103,19 @@ function output_all_basic_variables() {
initialization::ga_output mysql-versions \
"$(initialization::parameters_to_json "${MYSQL_VERSION}")"
fi
-
initialization::ga_output default-mysql-version "${MYSQL_VERSION}"
+ if [[ ${FULL_TESTS_NEEDED_LABEL} == "true" ]]; then
+ initialization::ga_output mssql-versions \
+ "$(initialization::parameters_to_json
"${CURRENT_MSSQL_VERSIONS[@]}")"
+ else
+ initialization::ga_output mssql-versions \
+ "$(initialization::parameters_to_json "${MSSQL_VERSION}")"
+ fi
+ initialization::ga_output default-mssql-version "${MSSQL_VERSION}"
+
+
+
initialization::ga_output kind-versions \
"$(initialization::parameters_to_json "${CURRENT_KIND_VERSIONS[@]}")"
initialization::ga_output default-kind-version "${KIND_VERSION}"
@@ -117,10 +127,12 @@ function output_all_basic_variables() {
if [[ ${FULL_TESTS_NEEDED_LABEL} == "true" ]]; then
initialization::ga_output postgres-exclude '[{ "python-version": "3.6"
}]'
initialization::ga_output mysql-exclude '[{ "python-version": "3.7" }]'
+ initialization::ga_output mssql-exclude '[{ "python-version": "3.7" }]'
initialization::ga_output sqlite-exclude '[{ "python-version": "3.8"
}]'
else
initialization::ga_output postgres-exclude '[]'
initialization::ga_output mysql-exclude '[]'
+ initialization::ga_output mssql-exclude '[]'
initialization::ga_output sqlite-exclude '[]'
fi
diff --git a/scripts/ci/testing/ci_run_airflow_testing.sh
b/scripts/ci/testing/ci_run_airflow_testing.sh
index fa8c044..9368b87 100755
--- a/scripts/ci/testing/ci_run_airflow_testing.sh
+++ b/scripts/ci/testing/ci_run_airflow_testing.sh
@@ -96,6 +96,12 @@ function run_all_test_types_in_parallel() {
# Remove Integration from list of tests to run in parallel
test_types_to_run="${test_types_to_run//Integration/}"
run_integration_tests_separately="true"
+ if [[ ${BACKEND} == "mssql" ]]; then
+ # Skip running "Integration" tests for low memory condition for
mssql
+ run_integration_tests_separately="false"
+ else
+ run_integration_tests_separately="true"
+ fi
fi
fi
set +e
diff --git a/scripts/in_container/check_environment.sh
b/scripts/in_container/check_environment.sh
index 22c6fe5..3237bc9 100755
--- a/scripts/in_container/check_environment.sh
+++ b/scripts/in_container/check_environment.sh
@@ -98,6 +98,8 @@ function check_db_backend {
check_service "PostgreSQL" "run_nc postgres 5432" "${MAX_CHECK}"
elif [[ ${BACKEND} == "mysql" ]]; then
check_service "MySQL" "run_nc mysql 3306" "${MAX_CHECK}"
+ elif [[ ${BACKEND} == "mssql" ]]; then
+ check_service "MSSQL" "run_nc mssql 1433" "${MAX_CHECK}"
elif [[ ${BACKEND} == "sqlite" ]]; then
return
else
diff --git a/tests/bats/breeze/test_breeze_complete.bats
b/tests/bats/breeze/test_breeze_complete.bats
index c1dfed1..249a493 100644
--- a/tests/bats/breeze/test_breeze_complete.bats
+++ b/tests/bats/breeze/test_breeze_complete.bats
@@ -239,6 +239,22 @@
assert_equal "${_breeze_default_mysql_version}" "${MYSQL_VERSION}"
}
+@test "Test allowed MSSQL versions same as CURRENT" {
+ load ../bats_utils
+ #shellcheck source=breeze-complete
+ source "${AIRFLOW_SOURCES}/breeze-complete"
+
+ assert_equal "${_breeze_allowed_mssql_versions}"
"${CURRENT_MSSQL_VERSIONS[*]}"
+}
+
+@test "Test default MSSQL version same as MSSQL_VERSION" {
+ load ../bats_utils
+ #shellcheck source=breeze-complete
+ source "${AIRFLOW_SOURCES}/breeze-complete"
+
+ assert_equal "${_breeze_default_mssql_version}" "${MSSQL_VERSION}"
+}
+
@test "Test allowed Postgres versions same as CURRENT" {
load ../bats_utils
#shellcheck source=breeze-complete
diff --git a/tests/models/test_renderedtifields.py
b/tests/models/test_renderedtifields.py
index d83f542..bd88d5d 100644
--- a/tests/models/test_renderedtifields.py
+++ b/tests/models/test_renderedtifields.py
@@ -172,7 +172,14 @@ class TestRenderedTaskInstanceFields(unittest.TestCase):
assert rtif_num == len(result)
# Verify old records are deleted and only 'num_to_keep' records are
kept
- with assert_queries_count(expected_query_count):
+ # For other DBs,an extra query is fired in
RenderedTaskInstanceFields.delete_old_records
+ expected_query_count_based_on_db = (
+ expected_query_count + 1
+ if session.bind.dialect.name == "mssql" and expected_query_count
!= 0
+ else expected_query_count
+ )
+
+ with assert_queries_count(expected_query_count_based_on_db):
RTIF.delete_old_records(task_id=task.task_id, dag_id=task.dag_id,
num_to_keep=num_to_keep)
result = session.query(RTIF).filter(RTIF.dag_id == dag.dag_id,
RTIF.task_id == task.task_id).all()
assert remaining_rtifs == len(result)
diff --git a/tests/models/test_taskinstance.py
b/tests/models/test_taskinstance.py
index 35c00fe..9c2beb6 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -2098,8 +2098,15 @@ class TestRunRawTaskQueriesCount(unittest.TestCase):
run_type=DagRunType.SCHEDULED,
session=session,
)
+ # an extra query is fired in
RenderedTaskInstanceFields.delete_old_records
+ # for other DBs. delete_old_records is called only when mark_success
is False
+ expected_query_count_based_on_db = (
+ expected_query_count + 1
+ if session.bind.dialect.name == "mssql" and expected_query_count >
0 and not mark_success
+ else expected_query_count
+ )
- with assert_queries_count(expected_query_count):
+ with assert_queries_count(expected_query_count_based_on_db):
ti._run_raw_task(mark_success=mark_success)
def test_execute_queries_count_store_serialized(self):
@@ -2116,8 +2123,11 @@ class TestRunRawTaskQueriesCount(unittest.TestCase):
run_type=DagRunType.SCHEDULED,
session=session,
)
+ # an extra query is fired in
RenderedTaskInstanceFields.delete_old_records
+ # for other DBs
+ expected_query_count_based_on_db = 13 if session.bind.dialect.name ==
"mssql" else 12
- with assert_queries_count(12):
+ with assert_queries_count(expected_query_count_based_on_db):
ti._run_raw_task()
def test_operator_field_with_serialization(self):
diff --git a/tests/utils/test_db.py b/tests/utils/test_db.py
index e608ff1..601dc6f 100644
--- a/tests/utils/test_db.py
+++ b/tests/utils/test_db.py
@@ -68,6 +68,12 @@ class TestDb(unittest.TestCase):
lambda t: (t[0] == 'remove_index' and t[1].name ==
'permission_view_id'),
# from test_security unit test
lambda t: (t[0] == 'remove_table' and t[1].name == 'some_model'),
+ # MSSQL default tables
+ lambda t: (t[0] == 'remove_table' and t[1].name == 'spt_monitor'),
+ lambda t: (t[0] == 'remove_table' and t[1].name ==
'spt_fallback_db'),
+ lambda t: (t[0] == 'remove_table' and t[1].name ==
'spt_fallback_usg'),
+ lambda t: (t[0] == 'remove_table' and t[1].name ==
'MSreplication_options'),
+ lambda t: (t[0] == 'remove_table' and t[1].name ==
'spt_fallback_dev'),
]
for ignore in ignores:
diff = [d for d in diff if not ignore(d)]