This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new f2b7d05  Improve compatibility with mssql (#9973)
f2b7d05 is described below

commit f2b7d05db5042126e40767cc5a707896fb7db1de
Author: Aneesh Joseph <[email protected]>
AuthorDate: Wed May 26 14:56:51 2021 +0530

    Improve compatibility with mssql (#9973)
    
    This PR adds full support for MsSQL! YAY!
---
 .github/workflows/ci.yml                           |  63 +++++
 BREEZE.rst                                         |   4 +-
 Dockerfile.ci                                      |  11 +
 README.md                                          |  15 +-
 airflow/api_connexion/parameters.py                |  13 +-
 airflow/jobs/scheduler_job.py                      |  45 ++--
 .../83f031fd9f1c_improve_mssql_compatibility.py    | 257 +++++++++++++++++++++
 ...d48763f6d53_add_unique_constraint_to_conn_id.py |   2 +-
 ...606e2_add_scheduling_decision_to_dagrun_and_.py |  28 ++-
 .../bbf4a7ad0465_remove_id_column_from_xcom.py     |  67 +++++-
 ...4a3141f0_make_xcom_pkey_columns_non_nullable.py |  76 ++++++
 airflow/models/dag.py                              |  15 +-
 airflow/models/dagcode.py                          |   5 +-
 airflow/models/dagrun.py                           |   5 +-
 airflow/models/serialized_dag.py                   |  21 +-
 airflow/sensors/smart_sensor.py                    |  29 ++-
 airflow/www/security.py                            |   8 +
 airflow/www/views.py                               |  16 +-
 breeze                                             |   7 +
 breeze-complete                                    |   9 +-
 docs/apache-airflow/installation.rst               |   1 +
 .../{base.yml => backend-mssql-port.yml}           |  22 +-
 .../docker-compose/{base.yml => backend-mssql.yml} |  29 +--
 scripts/ci/docker-compose/base.yml                 |   1 +
 scripts/ci/libraries/_initialization.sh            |  10 +
 scripts/ci/selective_ci_checks.sh                  |  14 +-
 scripts/ci/testing/ci_run_airflow_testing.sh       |   6 +
 scripts/in_container/check_environment.sh          |   2 +
 tests/bats/breeze/test_breeze_complete.bats        |  16 ++
 tests/models/test_renderedtifields.py              |   9 +-
 tests/models/test_taskinstance.py                  |  14 +-
 tests/utils/test_db.py                             |   6 +
 32 files changed, 713 insertions(+), 113 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index b026840..3203fa3 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -145,6 +145,7 @@ jobs:
       postgresVersions: ${{ steps.selective-checks.outputs.postgres-versions }}
       defaultPostgresVersion: ${{ 
steps.selective-checks.outputs.default-postgres-version }}
       mysqlVersions: ${{ steps.selective-checks.outputs.mysql-versions }}
+      mssqlVersions: ${{ steps.selective-checks.outputs.mssql-versions }}
       defaultMySQLVersion: ${{ 
steps.selective-checks.outputs.default-mysql-version }}
       helmVersions: ${{ steps.selective-checks.outputs.helm-versions }}
       defaultHelmVersion: ${{ 
steps.selective-checks.outputs.default-helm-version }}
@@ -153,6 +154,7 @@ jobs:
       testTypes: ${{ steps.selective-checks.outputs.test-types }}
       postgresExclude: ${{ steps.selective-checks.outputs.postgres-exclude }}
       mysqlExclude: ${{ steps.selective-checks.outputs.mysql-exclude }}
+      mssqlExclude: ${{ steps.selective-checks.outputs.mssql-exclude }}
       sqliteExclude: ${{ steps.selective-checks.outputs.sqlite-exclude }}
       run-tests: ${{ steps.selective-checks.outputs.run-tests }}
       run-ui-tests: ${{ steps.selective-checks.outputs.run-ui-tests }}
@@ -762,6 +764,62 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
           path: "./files/coverage*.xml"
           retention-days: 7
 
+  tests-mssql:
+    timeout-minutes: 130
+    name: >
+      MSSQL${{matrix.mssql-version}}, Py${{matrix.python-version}}: 
${{needs.build-info.outputs.testTypes}}
+    runs-on: ${{ fromJson(needs.build-info.outputs.runsOn) }}
+    needs: [build-info, ci-images]
+    strategy:
+      matrix:
+        python-version: ${{ fromJson(needs.build-info.outputs.pythonVersions) 
}}
+        mssql-version: ${{ fromJson(needs.build-info.outputs.mssqlVersions) }}
+        exclude: ${{ fromJson(needs.build-info.outputs.mssqlExclude) }}
+      fail-fast: false
+    env:
+      RUNS_ON: ${{ fromJson(needs.build-info.outputs.runsOn) }}
+      BACKEND: mssql
+      PYTHON_MAJOR_MINOR_VERSION: ${{ matrix.python-version }}
+      MSSQL_VERSION: ${{ matrix.mssql-version }}
+      TEST_TYPES: "${{needs.build-info.outputs.testTypes}}"
+      GITHUB_REGISTRY: ${{ needs.ci-images.outputs.githubRegistry }}
+    if: needs.build-info.outputs.run-tests == 'true'
+    steps:
+      - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
+        uses: actions/checkout@v2
+        with:
+          persist-credentials: false
+      - name: "Setup python"
+        uses: actions/setup-python@v2
+        with:
+          python-version: ${{ env.PYTHON_MAJOR_MINOR_VERSION }}
+      - name: "Free space"
+        run: ./scripts/ci/tools/ci_free_space_on_ci.sh
+      - name: "Prepare CI image ${{env.PYTHON_MAJOR_MINOR_VERSION}}:${{ 
env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}"
+        run: ./scripts/ci/images/ci_prepare_ci_image_on_ci.sh
+      - name: "Tests: ${{needs.build-info.outputs.testTypes}}"
+        run: ./scripts/ci/testing/ci_run_airflow_testing.sh
+      - name: "Upload airflow logs"
+        uses: actions/upload-artifact@v2
+        if: failure()
+        with:
+          name: 
airflow-logs-${{matrix.python-version}}-${{matrix.mssql-version}}
+          path: "./files/airflow_logs*"
+          retention-days: 7
+      - name: "Upload container logs"
+        uses: actions/upload-artifact@v2
+        if: failure()
+        with:
+          name: 
container-logs-mssql-${{matrix.python-version}}-${{matrix.mssql-version}}
+          path: "./files/container_logs*"
+          retention-days: 7
+      - name: "Upload artifact for coverage"
+        uses: actions/upload-artifact@v2
+        with:
+          name: 
coverage-mssql-${{matrix.python-version}}-${{matrix.mssql-version}}
+          path: "./files/coverage*.xml"
+          retention-days: 7
+
   tests-sqlite:
     timeout-minutes: 130
     name: >
@@ -898,6 +956,7 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
       - tests-postgres
       - tests-sqlite
       - tests-mysql
+      - tests-mssql
       - tests-quarantined
     env:
       RUNS_ON: ${{ fromJson(needs.build-info.outputs.runsOn) }}
@@ -1046,6 +1105,7 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
       - tests-sqlite
       - tests-postgres
       - tests-mysql
+      - tests-mssql
       - tests-kubernetes
       - prod-images
       - docs
@@ -1107,6 +1167,7 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
       - tests-sqlite
       - tests-postgres
       - tests-mysql
+      - tests-mssql
       - tests-kubernetes
       - ci-images
       - docs
@@ -1151,6 +1212,7 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
       - static-checks-pylint
       - tests-sqlite
       - tests-mysql
+      - tests-mssql
       - tests-postgres
       - tests-kubernetes
     env:
@@ -1221,6 +1283,7 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
       - tests-sqlite
       - tests-postgres
       - tests-mysql
+      - tests-mssql
       - tests-kubernetes
       - constraints
       - prepare-test-provider-packages-wheel
diff --git a/BREEZE.rst b/BREEZE.rst
index ed2b4fd..2f0510a 100644
--- a/BREEZE.rst
+++ b/BREEZE.rst
@@ -1883,7 +1883,7 @@ This is the current syntax for  `./breeze <./breeze>`_:
           Backend to use for tests - it determines which database is used.
           One of:
 
-                 sqlite mysql postgres
+                 sqlite mysql postgres mssql
 
           Default: sqlite
 
@@ -2349,7 +2349,7 @@ This is the current syntax for  `./breeze <./breeze>`_:
           Backend to use for tests - it determines which database is used.
           One of:
 
-                 sqlite mysql postgres
+                 sqlite mysql postgres mssql
 
           Default: sqlite
 
diff --git a/Dockerfile.ci b/Dockerfile.ci
index 92b6a45..14e4294 100644
--- a/Dockerfile.ci
+++ b/Dockerfile.ci
@@ -161,6 +161,17 @@ RUN mkdir -pv /usr/share/man/man1 \
     && apt-get autoremove -yqq --purge \
     && apt-get clean \
     && rm -rf /var/lib/apt/lists/* \
+    && curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - \
+    && curl https://packages.microsoft.com/config/debian/9/prod.list > 
/etc/apt/sources.list.d/mssql-release.list \
+    && apt-get update -yqq \
+    && apt-get upgrade -yqq \
+    && ACCEPT_EULA=Y apt-get -yqq install -y --no-install-recommends \
+      gcc \
+      unixodbc-dev  \
+      g++ \
+      msodbcsql17 \
+      mssql-tools \
+    && rm -rf /var/lib/apt/lists/* \
     && curl 
https://download.docker.com/linux/static/stable/x86_64/docker-${DOCKER_CLI_VERSION}.tgz
 \
     |  tar -C /usr/bin --strip-components=1 -xvzf - docker/docker
 
diff --git a/README.md b/README.md
index c9a8c86..d0d2e17 100644
--- a/README.md
+++ b/README.md
@@ -97,13 +97,14 @@ We **highly** recommend upgrading to the latest Airflow 
major release at the ear
 
 Apache Airflow is tested with:
 
-|              | Master version (dev)      | Stable version (2.0.2)   | 
Previous version (1.10.15) |
-| ------------ | ------------------------- | ------------------------ | 
-------------------------  |
-| Python       | 3.6, 3.7, 3.8             | 3.6, 3.7, 3.8            | 2.7, 
3.5, 3.6, 3.7, 3.8    |
-| Kubernetes   | 1.20, 1.19, 1.18          | 1.20, 1.19, 1.18         | 1.18, 
1.17, 1.16           |
-| PostgreSQL   | 9.6, 10, 11, 12, 13       | 9.6, 10, 11, 12, 13      | 9.6, 
10, 11, 12, 13        |
-| MySQL        | 5.7, 8                    | 5.7, 8                   | 5.6, 
5.7                   |
-| SQLite       | 3.15.0+                   | 3.15.0+                  | 
3.15.0+                    |
+|                      | Master version (dev)      | Stable version (2.0.2)   
| Previous version (1.10.15) |
+| -------------------- | ------------------------- | ------------------------ 
| -------------------------  |
+| Python               | 3.6, 3.7, 3.8             | 3.6, 3.7, 3.8            
| 2.7, 3.5, 3.6, 3.7, 3.8    |
+| Kubernetes           | 1.20, 1.19, 1.18          | 1.20, 1.19, 1.18         
| 1.18, 1.17, 1.16           |
+| PostgreSQL           | 9.6, 10, 11, 12, 13       | 9.6, 10, 11, 12, 13      
| 9.6, 10, 11, 12, 13        |
+| MySQL                | 5.7, 8                    | 5.7, 8                   
| 5.6, 5.7                   |
+| SQLite               | 3.15.0+                   | 3.15.0+                  
| 3.15.0+                    |
+| MSSQL(Experimental)  | 2017,2019                 |                          
|                            |
 
 **Note:** MySQL 5.x versions are unable to or have limitations with
 running multiple schedulers -- please see the [Scheduler 
docs](https://airflow.apache.org/docs/apache-airflow/stable/scheduler.html).
diff --git a/airflow/api_connexion/parameters.py 
b/airflow/api_connexion/parameters.py
index 8e06301..1dac50b 100644
--- a/airflow/api_connexion/parameters.py
+++ b/airflow/api_connexion/parameters.py
@@ -18,7 +18,7 @@ from functools import wraps
 from typing import Callable, Dict, TypeVar, cast
 
 from pendulum.parsing import ParserError
-from sqlalchemy import asc, desc
+from sqlalchemy import text
 
 from airflow.api_connexion.exceptions import BadRequest
 from airflow.configuration import conf
@@ -97,11 +97,10 @@ def apply_sorting(query, order_by, to_replace=None, 
allowed_attrs=None):
             detail=f"Ordering with '{lstriped_orderby}' is disallowed or "
             f"the attribute does not exist on the model"
         )
+    if to_replace:
+        lstriped_orderby = to_replace.get(lstriped_orderby, lstriped_orderby)
     if order_by[0] == "-":
-        func = desc
-        order_by = lstriped_orderby
+        order_by = f"{lstriped_orderby} desc"
     else:
-        func = asc
-    if to_replace:
-        order_by = to_replace.get(order_by, order_by)
-    return query.order_by(func(order_by))
+        order_by = f"{lstriped_orderby} asc"
+    return query.order_by(text(order_by))
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index b400ae9..bf4ad42 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -37,6 +37,7 @@ from sqlalchemy import and_, func, not_, or_, tuple_
 from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm import load_only, selectinload
 from sqlalchemy.orm.session import Session, make_transient
+from sqlalchemy.sql import expression
 
 from airflow import models, settings
 from airflow.configuration import conf
@@ -1067,15 +1068,15 @@ class SchedulerJob(BaseJob):  # pylint: 
disable=too-many-instance-attributes
 
         task_instance_str = "\n\t".join(repr(x) for x in executable_tis)
         self.log.info("Setting the following tasks to queued state:\n\t%s", 
task_instance_str)
-
-        # set TIs to queued state
-        filter_for_tis = TI.filter_for_tis(executable_tis)
-        session.query(TI).filter(filter_for_tis).update(
-            # TODO[ha]: should we use func.now()? How does that work with DB 
timezone on mysql when it's not
-            # UTC?
-            {TI.state: State.QUEUED, TI.queued_dttm: timezone.utcnow(), 
TI.queued_by_job_id: self.id},
-            synchronize_session=False,
-        )
+        if len(executable_tis) > 0:
+            # set TIs to queued state
+            filter_for_tis = TI.filter_for_tis(executable_tis)
+            session.query(TI).filter(filter_for_tis).update(
+                # TODO[ha]: should we use func.now()? How does that work with 
DB timezone
+                # on mysql when it's not UTC?
+                {TI.state: State.QUEUED, TI.queued_dttm: timezone.utcnow(), 
TI.queued_by_job_id: self.id},
+                synchronize_session=False,
+            )
 
         for ti in executable_tis:
             make_transient(ti)
@@ -1580,14 +1581,24 @@ class SchedulerJob(BaseJob):  # pylint: 
disable=too-many-instance-attributes
         # as DagModel.dag_id and DagModel.next_dagrun
         # This list is used to verify if the DagRun already exist so that we 
don't attempt to create
         # duplicate dag runs
-        active_dagruns = (
-            session.query(DagRun.dag_id, DagRun.execution_date)
-            .filter(
-                tuple_(DagRun.dag_id, DagRun.execution_date).in_(
-                    [(dm.dag_id, dm.next_dagrun) for dm in dag_models]
-                )
+
+        if session.bind.dialect.name == 'mssql':
+            active_dagruns_filter = or_(
+                *[
+                    and_(
+                        DagRun.dag_id == dm.dag_id,
+                        DagRun.execution_date == dm.next_dagrun,
+                    )
+                    for dm in dag_models
+                ]
             )
-            .all()
+        else:
+            active_dagruns_filter = tuple_(DagRun.dag_id, 
DagRun.execution_date).in_(
+                [(dm.dag_id, dm.next_dagrun) for dm in dag_models]
+            )
+
+        active_dagruns = (
+            session.query(DagRun.dag_id, 
DagRun.execution_date).filter(active_dagruns_filter).all()
         )
 
         for dag_model in dag_models:
@@ -1644,7 +1655,7 @@ class SchedulerJob(BaseJob):  # pylint: 
disable=too-many-instance-attributes
             .filter(
                 DagRun.dag_id.in_([o.dag_id for o in dag_models]),
                 DagRun.state == State.RUNNING,  # pylint: 
disable=comparison-with-callable
-                DagRun.external_trigger.is_(False),
+                DagRun.external_trigger == expression.false(),
             )
             .group_by(DagRun.dag_id)
             .all()
diff --git 
a/airflow/migrations/versions/83f031fd9f1c_improve_mssql_compatibility.py 
b/airflow/migrations/versions/83f031fd9f1c_improve_mssql_compatibility.py
new file mode 100644
index 0000000..ce5f3fc
--- /dev/null
+++ b/airflow/migrations/versions/83f031fd9f1c_improve_mssql_compatibility.py
@@ -0,0 +1,257 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""improve mssql compatibility
+
+Revision ID: 83f031fd9f1c
+Revises: a13f7613ad25
+Create Date: 2021-04-06 12:22:02.197726
+
+"""
+
+from collections import defaultdict
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy.dialects import mssql
+
+# revision identifiers, used by Alembic.
+revision = '83f031fd9f1c'
+down_revision = 'a13f7613ad25'
+branch_labels = None
+depends_on = None
+
+
+def is_table_empty(conn, table_name):
+    """
+    This function checks if the MS SQL table is empty
+
+    :param conn: SQL connection object
+    :param table_name: table name
+    :return: Booelan indicating if the table is present
+    """
+    return conn.execute(f'select TOP 1 * from {table_name}').first() is None
+
+
+def get_table_constraints(conn, table_name):
+    """
+    This function return primary and unique constraint
+    along with column name. some tables like task_instance
+    is missing primary key constraint name and the name is
+    auto-generated by sql server. so this function helps to
+    retrieve any primary or unique constraint name.
+
+    :param conn: sql connection object
+    :param table_name: table name
+    :return: a dictionary of ((constraint name, constraint type), column name) 
of table
+    :rtype: defaultdict(list)
+    """
+    query = """SELECT tc.CONSTRAINT_NAME , tc.CONSTRAINT_TYPE, ccu.COLUMN_NAME
+     FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS tc
+     JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE AS ccu ON 
ccu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME
+     WHERE tc.TABLE_NAME = '{table_name}' AND
+     (tc.CONSTRAINT_TYPE = 'PRIMARY KEY' or UPPER(tc.CONSTRAINT_TYPE) = 
'UNIQUE')
+    """.format(
+        table_name=table_name
+    )
+    result = conn.execute(query).fetchall()
+    constraint_dict = defaultdict(list)
+    for constraint, constraint_type, column in result:
+        constraint_dict[(constraint, constraint_type)].append(column)
+    return constraint_dict
+
+
+def drop_column_constraints(operator, column_name, constraint_dict):
+    """
+    Drop a primary key or unique constraint
+
+    :param operator: batch_alter_table for the table
+    :param constraint_dict: a dictionary of ((constraint name, constraint 
type), column name) of table
+    """
+    for constraint, columns in constraint_dict.items():
+        if column_name in columns:
+            if constraint[1].lower().startswith("primary"):
+                operator.drop_constraint(constraint[0], type_='primary')
+            elif constraint[1].lower().startswith("unique"):
+                operator.drop_constraint(constraint[0], type_='unique')
+
+
+def create_constraints(operator, column_name, constraint_dict):
+    """
+    Create a primary key or unique constraint
+
+    :param operator: batch_alter_table for the table
+    :param constraint_dict: a dictionary of ((constraint name, constraint 
type), column name) of table
+    """
+    for constraint, columns in constraint_dict.items():
+        if column_name in columns:
+            if constraint[1].lower().startswith("primary"):
+                operator.create_primary_key(constraint_name=constraint[0], 
columns=columns)
+            elif constraint[1].lower().startswith("unique"):
+                
operator.create_unique_constraint(constraint_name=constraint[0], 
columns=columns)
+
+
+def _use_date_time2(conn):
+    result = conn.execute(
+        """SELECT CASE WHEN CONVERT(VARCHAR(128), SERVERPROPERTY 
('productversion'))
+        like '8%' THEN '2000' WHEN CONVERT(VARCHAR(128), SERVERPROPERTY 
('productversion'))
+        like '9%' THEN '2005' ELSE '2005Plus' END AS MajorVersion"""
+    ).fetchone()
+    mssql_version = result[0]
+    return mssql_version not in ("2000", "2005")
+
+
+def _is_timestamp(conn, table_name, column_name):
+    query = f"""SELECT
+    TYPE_NAME(C.USER_TYPE_ID) AS DATA_TYPE
+    FROM SYS.COLUMNS C
+    JOIN SYS.TYPES T
+    ON C.USER_TYPE_ID=T.USER_TYPE_ID
+    WHERE C.OBJECT_ID=OBJECT_ID('{table_name}') and C.NAME='{column_name}';
+    """
+    column_type = conn.execute(query).fetchone()[0]
+    return column_type == "timestamp"
+
+
+def recreate_mssql_ts_column(conn, op, table_name, column_name):
+    """
+    Drop the timestamp column and recreate it as
+    datetime or datetime2(6)
+    """
+    if _is_timestamp(conn, table_name, column_name) and is_table_empty(conn, 
table_name):
+        with op.batch_alter_table(table_name) as batch_op:
+            constraint_dict = get_table_constraints(conn, table_name)
+            drop_column_constraints(batch_op, column_name, constraint_dict)
+            batch_op.drop_column(column_name=column_name)
+            if _use_date_time2(conn):
+                batch_op.add_column(sa.Column(column_name, 
mssql.DATETIME2(precision=6), nullable=False))
+            else:
+                batch_op.add_column(sa.Column(column_name, mssql.DATETIME, 
nullable=False))
+            create_constraints(batch_op, column_name, constraint_dict)
+
+
+def alter_mssql_datetime_column(conn, op, table_name, column_name, nullable):
+    """Update the datetime column to datetime2(6)"""
+    if _use_date_time2(conn):
+        op.alter_column(
+            table_name=table_name,
+            column_name=column_name,
+            type_=mssql.DATETIME2(precision=6),
+            nullable=nullable,
+        )
+
+
+def alter_mssql_datetime2_column(conn, op, table_name, column_name, nullable):
+    """Update the datetime2(6) column to datetime"""
+    if _use_date_time2(conn):
+        op.alter_column(
+            table_name=table_name, column_name=column_name, 
type_=mssql.DATETIME, nullable=nullable
+        )
+
+
+def _get_timestamp(conn):
+    if _use_date_time2(conn):
+        return mssql.DATETIME2(precision=6)
+    else:
+        return mssql.DATETIME
+
+
+def upgrade():
+    """Improve compatibility with MSSQL backend"""
+    conn = op.get_bind()
+    if conn.dialect.name != 'mssql':
+        return
+    recreate_mssql_ts_column(conn, op, 'dag_code', 'last_updated')
+    recreate_mssql_ts_column(conn, op, 'rendered_task_instance_fields', 
'execution_date')
+    alter_mssql_datetime_column(conn, op, 'serialized_dag', 'last_updated', 
False)
+    op.alter_column(table_name="xcom", column_name="timestamp", 
type_=_get_timestamp(conn), nullable=False)
+    with op.batch_alter_table('task_reschedule') as task_reschedule_batch_op:
+        task_reschedule_batch_op.alter_column(
+            column_name='end_date', type_=_get_timestamp(conn), nullable=False
+        )
+        task_reschedule_batch_op.alter_column(
+            column_name='reschedule_date', type_=_get_timestamp(conn), 
nullable=False
+        )
+        task_reschedule_batch_op.alter_column(
+            column_name='start_date', type_=_get_timestamp(conn), 
nullable=False
+        )
+    with op.batch_alter_table('task_fail') as task_fail_batch_op:
+        task_fail_batch_op.drop_index('idx_task_fail_dag_task_date')
+        task_fail_batch_op.alter_column(
+            column_name="execution_date", type_=_get_timestamp(conn), 
nullable=False
+        )
+        task_fail_batch_op.create_index(
+            'idx_task_fail_dag_task_date', ['dag_id', 'task_id', 
'execution_date'], unique=False
+        )
+    with op.batch_alter_table('task_instance') as task_instance_batch_op:
+        task_instance_batch_op.drop_index('ti_state_lkp')
+        task_instance_batch_op.create_index(
+            'ti_state_lkp', ['dag_id', 'task_id', 'execution_date', 'state'], 
unique=False
+        )
+    constraint_dict = get_table_constraints(conn, 'dag_run')
+    for constraint, columns in constraint_dict.items():
+        if 'dag_id' in columns:
+            if constraint[1].lower().startswith("unique"):
+                op.drop_constraint(constraint[0], 'dag_run', type_='unique')
+    # create filtered indexes
+    conn.execute(
+        """CREATE UNIQUE NONCLUSTERED INDEX idx_not_null_dag_id_execution_date
+                ON dag_run(dag_id,execution_date)
+                WHERE dag_id IS NOT NULL and execution_date is not null"""
+    )
+    conn.execute(
+        """CREATE UNIQUE NONCLUSTERED INDEX idx_not_null_dag_id_run_id
+                 ON dag_run(dag_id,run_id)
+                 WHERE dag_id IS NOT NULL and run_id is not null"""
+    )
+
+
+def downgrade():
+    """Reverse MSSQL backend compatibility improvements"""
+    conn = op.get_bind()
+    if conn.dialect.name != 'mssql':
+        return
+    alter_mssql_datetime2_column(conn, op, 'serialized_dag', 'last_updated', 
False)
+    op.alter_column(table_name="xcom", column_name="timestamp", 
type_=_get_timestamp(conn), nullable=True)
+    with op.batch_alter_table('task_reschedule') as task_reschedule_batch_op:
+        task_reschedule_batch_op.alter_column(
+            column_name='end_date', type_=_get_timestamp(conn), nullable=True
+        )
+        task_reschedule_batch_op.alter_column(
+            column_name='reschedule_date', type_=_get_timestamp(conn), 
nullable=True
+        )
+        task_reschedule_batch_op.alter_column(
+            column_name='start_date', type_=_get_timestamp(conn), nullable=True
+        )
+    with op.batch_alter_table('task_fail') as task_fail_batch_op:
+        task_fail_batch_op.drop_index('idx_task_fail_dag_task_date')
+        task_fail_batch_op.alter_column(
+            column_name="execution_date", type_=_get_timestamp(conn), 
nullable=False
+        )
+        task_fail_batch_op.create_index(
+            'idx_task_fail_dag_task_date', ['dag_id', 'task_id', 
'execution_date'], unique=False
+        )
+    with op.batch_alter_table('task_instance') as task_instance_batch_op:
+        task_instance_batch_op.drop_index('ti_state_lkp')
+        task_instance_batch_op.create_index(
+            'ti_state_lkp', ['dag_id', 'task_id', 'execution_date'], 
unique=False
+        )
+    op.create_unique_constraint('UQ__dag_run__dag_id_run_id', 'dag_run', 
['dag_id', 'run_id'])
+    op.create_unique_constraint('UQ__dag_run__dag_id_execution_date', 
'dag_run', ['dag_id', 'execution_date'])
+    op.drop_index('idx_not_null_dag_id_execution_date', table_name='dag_run')
+    op.drop_index('idx_not_null_dag_id_run_id', table_name='dag_run')
diff --git 
a/airflow/migrations/versions/8d48763f6d53_add_unique_constraint_to_conn_id.py 
b/airflow/migrations/versions/8d48763f6d53_add_unique_constraint_to_conn_id.py
index 2c743e4..44be988 100644
--- 
a/airflow/migrations/versions/8d48763f6d53_add_unique_constraint_to_conn_id.py
+++ 
b/airflow/migrations/versions/8d48763f6d53_add_unique_constraint_to_conn_id.py
@@ -38,9 +38,9 @@ def upgrade():
     """Apply add unique constraint to conn_id and set it as non-nullable"""
     try:
         with op.batch_alter_table('connection') as batch_op:
+            batch_op.alter_column("conn_id", nullable=False, 
existing_type=sa.String(250))
             
batch_op.create_unique_constraint(constraint_name="unique_conn_id", 
columns=["conn_id"])
 
-            batch_op.alter_column("conn_id", nullable=False, 
existing_type=sa.String(250))
     except sa.exc.IntegrityError:
         raise Exception("Make sure there are no duplicate connections with the 
same conn_id or null values")
 
diff --git 
a/airflow/migrations/versions/98271e7606e2_add_scheduling_decision_to_dagrun_and_.py
 
b/airflow/migrations/versions/98271e7606e2_add_scheduling_decision_to_dagrun_and_.py
index 8019aa2..d220d32 100644
--- 
a/airflow/migrations/versions/98271e7606e2_add_scheduling_decision_to_dagrun_and_.py
+++ 
b/airflow/migrations/versions/98271e7606e2_add_scheduling_decision_to_dagrun_and_.py
@@ -26,7 +26,7 @@ Create Date: 2020-10-01 12:13:32.968148
 
 import sqlalchemy as sa
 from alembic import op
-from sqlalchemy.dialects import mysql
+from sqlalchemy.dialects import mssql, mysql
 
 # revision identifiers, used by Alembic.
 revision = '98271e7606e2'
@@ -35,12 +35,32 @@ branch_labels = None
 depends_on = None
 
 
+def _use_date_time2(conn):
+    result = conn.execute(
+        """SELECT CASE WHEN CONVERT(VARCHAR(128), SERVERPROPERTY 
('productversion'))
+        like '8%' THEN '2000' WHEN CONVERT(VARCHAR(128), SERVERPROPERTY 
('productversion'))
+        like '9%' THEN '2005' ELSE '2005Plus' END AS MajorVersion"""
+    ).fetchone()
+    mssql_version = result[0]
+    return mssql_version not in ("2000", "2005")
+
+
+def _get_timestamp(conn):
+    dialect_name = conn.dialect.name
+    if dialect_name == "mssql":
+        return mssql.DATETIME2(precision=6) if _use_date_time2(conn) else 
mssql.DATETIME
+    elif dialect_name != "mysql":
+        return sa.TIMESTAMP(timezone=True)
+    else:
+        return mysql.TIMESTAMP(fsp=6, timezone=True)
+
+
 def upgrade():
     """Apply Add scheduling_decision to DagRun and DAG"""
     conn = op.get_bind()  # pylint: disable=no-member
-    is_mysql = bool(conn.dialect.name == "mysql")
     is_sqlite = bool(conn.dialect.name == "sqlite")
-    timestamp = sa.TIMESTAMP(timezone=True) if not is_mysql else 
mysql.TIMESTAMP(fsp=6, timezone=True)
+    is_mssql = bool(conn.dialect.name == "mssql")
+    timestamp = _get_timestamp(conn)
 
     if is_sqlite:
         op.execute("PRAGMA foreign_keys=off")
@@ -71,7 +91,7 @@ def upgrade():
 
     op.execute(
         "UPDATE dag SET concurrency={}, has_task_concurrency_limits={} where 
concurrency IS NULL".format(
-            concurrency, 1 if is_sqlite else sa.true()
+            concurrency, 1 if is_sqlite or is_mssql else sa.true()
         )
     )
 
diff --git 
a/airflow/migrations/versions/bbf4a7ad0465_remove_id_column_from_xcom.py 
b/airflow/migrations/versions/bbf4a7ad0465_remove_id_column_from_xcom.py
index f063aea..045f5a6 100644
--- a/airflow/migrations/versions/bbf4a7ad0465_remove_id_column_from_xcom.py
+++ b/airflow/migrations/versions/bbf4a7ad0465_remove_id_column_from_xcom.py
@@ -24,6 +24,8 @@ Create Date: 2019-10-29 13:53:09.445943
 
 """
 
+from collections import defaultdict
+
 from alembic import op
 from sqlalchemy import Column, Integer
 from sqlalchemy.engine.reflection import Inspector
@@ -35,6 +37,64 @@ branch_labels = None
 depends_on = None
 
 
+def get_table_constraints(conn, table_name):
+    """
+    This function return primary and unique constraint
+    along with column name. Some tables like `task_instance`
+    is missing the primary key constraint name and the name is
+    auto-generated by the SQL server. so this function helps to
+    retrieve any primary or unique constraint name.
+
+    :param conn: sql connection object
+    :param table_name: table name
+    :return: a dictionary of ((constraint name, constraint type), column name) 
of table
+    :rtype: defaultdict(list)
+    """
+    query = """SELECT tc.CONSTRAINT_NAME , tc.CONSTRAINT_TYPE, ccu.COLUMN_NAME
+     FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS tc
+     JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE AS ccu ON 
ccu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME
+     WHERE tc.TABLE_NAME = '{table_name}' AND
+     (tc.CONSTRAINT_TYPE = 'PRIMARY KEY' or UPPER(tc.CONSTRAINT_TYPE) = 
'UNIQUE')
+    """.format(
+        table_name=table_name
+    )
+    result = conn.execute(query).fetchall()
+    constraint_dict = defaultdict(list)
+    for constraint, constraint_type, column in result:
+        constraint_dict[(constraint, constraint_type)].append(column)
+    return constraint_dict
+
+
+def drop_column_constraints(operator, column_name, constraint_dict):
+    """
+    Drop a primary key or unique constraint
+
+    :param operator: batch_alter_table for the table
+    :param constraint_dict: a dictionary of ((constraint name, constraint 
type), column name) of table
+    """
+    for constraint, columns in constraint_dict.items():
+        if column_name in columns:
+            if constraint[1].lower().startswith("primary"):
+                operator.drop_constraint(constraint[0], type_='primary')
+            elif constraint[1].lower().startswith("unique"):
+                operator.drop_constraint(constraint[0], type_='unique')
+
+
+def create_constraints(operator, column_name, constraint_dict):
+    """
+    Create a primary key or unique constraint
+
+    :param operator: batch_alter_table for the table
+    :param constraint_dict: a dictionary of ((constraint name, constraint 
type), column name) of table
+    """
+    for constraint, columns in constraint_dict.items():
+        if column_name in columns:
+            if constraint[1].lower().startswith("primary"):
+                operator.create_primary_key(constraint_name=constraint[0], 
columns=columns)
+            elif constraint[1].lower().startswith("unique"):
+                
operator.create_unique_constraint(constraint_name=constraint[0], 
columns=columns)
+
+
 def upgrade():
     """Apply Remove id column from xcom"""
     conn = op.get_bind()
@@ -43,9 +103,14 @@ def upgrade():
     with op.batch_alter_table('xcom') as bop:
         xcom_columns = [col.get('name') for col in 
inspector.get_columns("xcom")]
         if "id" in xcom_columns:
+            if conn.dialect.name == 'mssql':
+                constraint_dict = get_table_constraints(conn, "xcom")
+                drop_column_constraints(bop, 'id', constraint_dict)
             bop.drop_column('id')
             bop.drop_index('idx_xcom_dag_task_date')
-            bop.create_primary_key('pk_xcom', ['dag_id', 'task_id', 'key', 
'execution_date'])
+            # mssql doesn't allow primary keys with nullable columns
+            if conn.dialect.name != 'mssql':
+                bop.create_primary_key('pk_xcom', ['dag_id', 'task_id', 'key', 
'execution_date'])
 
 
 def downgrade():
diff --git 
a/airflow/migrations/versions/e9304a3141f0_make_xcom_pkey_columns_non_nullable.py
 
b/airflow/migrations/versions/e9304a3141f0_make_xcom_pkey_columns_non_nullable.py
new file mode 100644
index 0000000..bde065b
--- /dev/null
+++ 
b/airflow/migrations/versions/e9304a3141f0_make_xcom_pkey_columns_non_nullable.py
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""make xcom pkey columns non-nullable
+
+Revision ID: e9304a3141f0
+Revises: 83f031fd9f1c
+Create Date: 2021-04-06 13:22:02.197726
+
+"""
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy.dialects import mssql, mysql
+
+from airflow.models.base import COLLATION_ARGS
+
+# revision identifiers, used by Alembic.
+revision = 'e9304a3141f0'
+down_revision = '83f031fd9f1c'
+branch_labels = None
+depends_on = None
+
+
+def _use_date_time2(conn):
+    result = conn.execute(
+        """SELECT CASE WHEN CONVERT(VARCHAR(128), SERVERPROPERTY 
('productversion'))
+        like '8%' THEN '2000' WHEN CONVERT(VARCHAR(128), SERVERPROPERTY 
('productversion'))
+        like '9%' THEN '2005' ELSE '2005Plus' END AS MajorVersion"""
+    ).fetchone()
+    mssql_version = result[0]
+    return mssql_version not in ("2000", "2005")
+
+
+def _get_timestamp(conn):
+    dialect_name = conn.dialect.name
+    if dialect_name == "mssql":
+        return mssql.DATETIME2(precision=6) if _use_date_time2(conn) else 
mssql.DATETIME
+    elif dialect_name == "mysql":
+        return mysql.TIMESTAMP(fsp=6, timezone=True)
+    else:
+        return sa.TIMESTAMP(timezone=True)
+
+
+def upgrade():
+    """Apply make xcom pkey columns non-nullable"""
+    conn = op.get_bind()
+    with op.batch_alter_table('xcom') as bop:
+        bop.alter_column("key", type_=sa.String(length=512, **COLLATION_ARGS), 
nullable=False)
+        bop.alter_column("execution_date", type_=_get_timestamp(conn), 
nullable=False)
+        if conn.dialect.name == 'mssql':
+            bop.create_primary_key('pk_xcom', ['dag_id', 'task_id', 'key', 
'execution_date'])
+
+
+def downgrade():
+    """Unapply make xcom pkey columns non-nullable"""
+    conn = op.get_bind()
+    with op.batch_alter_table('xcom') as bop:
+        if conn.dialect.name == 'mssql':
+            bop.drop_constraint('pk_xcom', 'primary')
+        bop.alter_column("key", type_=sa.String(length=512, **COLLATION_ARGS), 
nullable=True)
+        bop.alter_column("execution_date", type_=_get_timestamp(conn), 
nullable=True)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 9f929bf..1cf3797 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -52,6 +52,7 @@ from jinja2.nativetypes import NativeEnvironment
 from sqlalchemy import Boolean, Column, ForeignKey, Index, Integer, String, 
Text, func, or_
 from sqlalchemy.orm import backref, joinedload, relationship
 from sqlalchemy.orm.session import Session
+from sqlalchemy.sql import expression
 
 import airflow.templates
 from airflow import settings, utils
@@ -100,7 +101,7 @@ def get_last_dagrun(dag_id, session, 
include_externally_triggered=False):
     DR = DagRun
     query = session.query(DR).filter(DR.dag_id == dag_id)
     if not include_externally_triggered:
-        query = query.filter(DR.external_trigger == False)  # noqa pylint: 
disable=singleton-comparison
+        query = query.filter(DR.external_trigger == expression.false())
     query = query.order_by(DR.execution_date.desc())
     return query.first()
 
@@ -897,7 +898,9 @@ class DAG(LoggingMixin):
         )
 
         if external_trigger is not None:
-            query = query.filter(DagRun.external_trigger == external_trigger)
+            query = query.filter(
+                DagRun.external_trigger == (expression.true() if 
external_trigger else expression.false())
+            )
 
         return query.scalar()
 
@@ -1872,7 +1875,7 @@ class DAG(LoggingMixin):
             .filter(
                 DagRun.dag_id.in_(existing_dag_ids),
                 DagRun.state == State.RUNNING,  # pylint: 
disable=comparison-with-callable
-                DagRun.external_trigger.is_(False),
+                DagRun.external_trigger == expression.false(),
             )
             .group_by(DagRun.dag_id)
             .all()
@@ -2186,7 +2189,7 @@ class DagModel(Base):
         """
         paused_dag_ids = (
             session.query(DagModel.dag_id)
-            .filter(DagModel.is_paused.is_(True))
+            .filter(DagModel.is_paused == expression.true())
             .filter(DagModel.dag_id.in_(dag_ids))
             .all()
         )
@@ -2270,8 +2273,8 @@ class DagModel(Base):
         query = (
             session.query(cls)
             .filter(
-                cls.is_paused.is_(False),
-                cls.is_active.is_(True),
+                cls.is_paused == expression.false(),
+                cls.is_active == expression.true(),
                 cls.next_dagrun_create_after <= func.now(),
             )
             .order_by(cls.next_dagrun_create_after)
diff --git a/airflow/models/dagcode.py b/airflow/models/dagcode.py
index ce65fc2..4295a5b 100644
--- a/airflow/models/dagcode.py
+++ b/airflow/models/dagcode.py
@@ -20,7 +20,8 @@ import struct
 from datetime import datetime
 from typing import Iterable, List, Optional
 
-from sqlalchemy import BigInteger, Column, String, Text, exists
+from sqlalchemy import BigInteger, Column, String, Text
+from sqlalchemy.sql.expression import literal
 
 from airflow.exceptions import AirflowException, DagCodeNotFound
 from airflow.models.base import Base
@@ -147,7 +148,7 @@ class DagCode(Base):
         :param session: ORM Session
         """
         fileloc_hash = cls.dag_fileloc_hash(fileloc)
-        return session.query(exists().where(cls.fileloc_hash == 
fileloc_hash)).scalar()
+        return session.query(literal(True)).filter(cls.fileloc_hash == 
fileloc_hash).one_or_none() is not None
 
     @classmethod
     def get_code_by_fileloc(cls, fileloc: str) -> str:
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 6bd96f3..e18351d 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -35,6 +35,7 @@ from sqlalchemy.exc import IntegrityError
 from sqlalchemy.ext.declarative import declared_attr
 from sqlalchemy.orm import backref, relationship, synonym
 from sqlalchemy.orm.session import Session
+from sqlalchemy.sql import expression
 
 from airflow import settings
 from airflow.configuration import conf as airflow_conf
@@ -211,8 +212,8 @@ class DagRun(Base, LoggingMixin):
                 DagModel.dag_id == cls.dag_id,
             )
             .filter(
-                DagModel.is_paused.is_(False),
-                DagModel.is_active.is_(True),
+                DagModel.is_paused == expression.false(),
+                DagModel.is_active == expression.true(),
             )
             .order_by(
                 nulls_first(cls.last_scheduling_decision, session=session),
diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py
index 81448b6..d0db6e8 100644
--- a/airflow/models/serialized_dag.py
+++ b/airflow/models/serialized_dag.py
@@ -26,8 +26,7 @@ from typing import Any, Dict, List, Optional
 import sqlalchemy_jsonfield
 from sqlalchemy import BigInteger, Column, Index, String, and_
 from sqlalchemy.orm import Session, backref, foreign, relationship
-from sqlalchemy.sql import exists
-from sqlalchemy.sql.expression import func
+from sqlalchemy.sql.expression import func, literal
 
 from airflow.models.base import ID_LEN, Base
 from airflow.models.dag import DAG, DagModel
@@ -117,14 +116,20 @@ class SerializedDagModel(Base):
         # If Yes, does nothing
         # If No or the DAG does not exists, updates / writes Serialized DAG to 
DB
         if min_update_interval is not None:
-            if session.query(
-                exists().where(
+            if (
+                session.query(literal(True))
+                .filter(
                     and_(
                         cls.dag_id == dag.dag_id,
                         (timezone.utcnow() - 
timedelta(seconds=min_update_interval)) < cls.last_updated,
                     )
                 )
-            ).scalar():
+                .first()
+                is not None
+            ):
+                # TODO: .first() is not None can be changed to .scalar() once 
we update to sqlalchemy 1.4+
+                # as the associated sqlalchemy bug for MySQL was fixed
+                # related issue : 
https://github.com/sqlalchemy/sqlalchemy/issues/5481
                 return False
 
         log.debug("Checking if DAG (%s) changed", dag.dag_id)
@@ -217,7 +222,7 @@ class SerializedDagModel(Base):
         :param dag_id: the DAG to check
         :param session: ORM Session
         """
-        return session.query(exists().where(cls.dag_id == dag_id)).scalar()
+        return session.query(literal(True)).filter(cls.dag_id == 
dag_id).first() is not None
 
     @classmethod
     @provide_session
@@ -313,7 +318,9 @@ class SerializedDagModel(Base):
         if session.bind.dialect.name in ["sqlite", "mysql"]:
             for row in session.query(cls.dag_id, func.json_extract(cls.data, 
"$.dag.dag_dependencies")).all():
                 dependencies[row[0]] = [DagDependency(**d) for d in 
json.loads(row[1])]
-
+        elif session.bind.dialect.name == "mssql":
+            for row in session.query(cls.dag_id, func.json_query(cls.data, 
"$.dag.dag_dependencies")).all():
+                dependencies[row[0]] = [DagDependency(**d) for d in 
json.loads(row[1])]
         else:
             for row in session.query(
                 cls.dag_id, func.json_extract_path(cls.data, "dag", 
"dag_dependencies")
diff --git a/airflow/sensors/smart_sensor.py b/airflow/sensors/smart_sensor.py
index 6c17beb..9d0a28c 100644
--- a/airflow/sensors/smart_sensor.py
+++ b/airflow/sensors/smart_sensor.py
@@ -391,23 +391,34 @@ class SmartSensorOperator(BaseOperator, SkipMixin):
         :param session: The sqlalchemy session.
         """
         TI = TaskInstance
-        ti_keys = [(x.dag_id, x.task_id, x.execution_date) for x in 
sensor_works]
 
-        def update_ti_hostname_with_count(count, ti_keys):
+        def update_ti_hostname_with_count(count, sensor_works):
             # Using or_ instead of in_ here to prevent from full table scan.
-            tis = (
-                session.query(TI)
-                .filter(or_(tuple_(TI.dag_id, TI.task_id, TI.execution_date) 
== ti_key for ti_key in ti_keys))
-                .all()
-            )
+            if session.bind.dialect.name == 'mssql':
+                ti_filter = or_(
+                    and_(
+                        TI.dag_id == ti_key.dag_id,
+                        TI.task_id == ti_key.task_id,
+                        TI.execution_date == ti_key.execution_date,
+                    )
+                    for ti_key in sensor_works
+                )
+            else:
+                ti_keys = [(x.dag_id, x.task_id, x.execution_date) for x in 
sensor_works]
+                ti_filter = or_(
+                    tuple_(TI.dag_id, TI.task_id, TI.execution_date) == ti_key 
for ti_key in ti_keys
+                )
+            tis = session.query(TI).filter(ti_filter).all()
 
             for ti in tis:
                 ti.hostname = self.hostname
             session.commit()
 
-            return count + len(ti_keys)
+            return count + len(sensor_works)
 
-        count = helpers.reduce_in_chunks(update_ti_hostname_with_count, 
ti_keys, 0, self.max_tis_per_query)
+        count = helpers.reduce_in_chunks(
+            update_ti_hostname_with_count, sensor_works, 0, 
self.max_tis_per_query
+        )
         if count:
             self.log.info("Updated hostname on %s tis.", count)
 
diff --git a/airflow/www/security.py b/airflow/www/security.py
index 8768587..9b391f7 100644
--- a/airflow/www/security.py
+++ b/airflow/www/security.py
@@ -378,6 +378,14 @@ class AirflowSecurityManager(SecurityManager, 
LoggingMixin):  # pylint: disable=
             return True
         return resource_name.startswith(permissions.RESOURCE_DAG_PREFIX)
 
+    def _has_view_access(self, user, action, resource) -> bool:
+        """
+        Overriding the method to ensure that it always returns a bool
+        _has_view_access can return NoneType which gives us
+        issues later on, this fixes that.
+        """
+        return bool(super()._has_view_access(user, action, resource))
+
     def has_access(self, permission, resource, user=None) -> bool:
         """
         Verify whether a given user could perform certain permission
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 78dca2f..e90ec6b 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -76,7 +76,7 @@ from jinja2.utils import htmlsafe_json_dumps, pformat  # 
type: ignore
 from pendulum.datetime import DateTime
 from pygments import highlight, lexers
 from pygments.formatters import HtmlFormatter  # noqa pylint: 
disable=no-name-in-module
-from sqlalchemy import and_, desc, func, or_, union_all
+from sqlalchemy import Date, and_, desc, func, or_, union_all
 from sqlalchemy.orm import joinedload
 from wtforms import SelectField, validators
 from wtforms.validators import InputRequired
@@ -2090,6 +2090,14 @@ class Airflow(AirflowBaseView):  # noqa: D101  pylint: 
disable=too-many-public-m
     @action_logging  # pylint: disable=too-many-locals
     def calendar(self):
         """Get DAG runs as calendar"""
+
+        def _convert_to_date(session, column):
+            """Convert column to date."""
+            if session.bind.dialect.name == 'mssql':
+                return column.cast(Date)
+            else:
+                return func.date(column)
+
         dag_id = request.args.get('dag_id')
         dag = current_app.dag_bag.get_dag(dag_id)
         if not dag:
@@ -2103,13 +2111,13 @@ class Airflow(AirflowBaseView):  # noqa: D101  pylint: 
disable=too-many-public-m
         with create_session() as session:
             dag_states = (
                 session.query(
-                    func.date(DagRun.execution_date).label('date'),
+                    (_convert_to_date(session, 
DagRun.execution_date)).label('date'),
                     DagRun.state,
                     func.count('*').label('count'),
                 )
                 .filter(DagRun.dag_id == dag.dag_id)
-                .group_by(func.date(DagRun.execution_date), DagRun.state)
-                .order_by(func.date(DagRun.execution_date).asc())
+                .group_by(_convert_to_date(session, DagRun.execution_date), 
DagRun.state)
+                .order_by(_convert_to_date(session, 
DagRun.execution_date).asc())
                 .all()
             )
 
diff --git a/breeze b/breeze
index 50f9e55..a838d65 100755
--- a/breeze
+++ b/breeze
@@ -3069,6 +3069,9 @@ function breeze::read_saved_environment_variables() {
     MYSQL_VERSION="${MYSQL_VERSION:=$(parameters::read_from_file 
MYSQL_VERSION)}"
     MYSQL_VERSION=${MYSQL_VERSION:=${_breeze_default_mysql_version}}
 
+    MSSQL_VERSION="${MSSQL_VERSION:=$(parameters::read_from_file 
MSSQL_VERSION)}"
+    MSSQL_VERSION=${MSSQL_VERSION:=${_breeze_default_mssql_version}}
+
     # Here you read DockerHub user/account that you use
     # You can populate your own images in DockerHub this way and work with the,
     # You can override it with "--dockerhub-user" option and it will be stored 
in .build directory
@@ -3102,6 +3105,7 @@ function breeze::read_saved_environment_variables() {
 #     EXECUTOR
 #     POSTGRES_VERSION
 #     MYSQL_VERSION
+#     MSSQL_VERSION
 #     DOCKERHUB_USER
 #     DOCKERHUB_REPO
 #
@@ -3135,6 +3139,7 @@ function breeze::check_and_save_all_params() {
     parameters::check_and_save_allowed_param "EXECUTOR" "Executors" 
"--executor"
     parameters::check_and_save_allowed_param "POSTGRES_VERSION" "Postgres 
version" "--postgres-version"
     parameters::check_and_save_allowed_param "MYSQL_VERSION" "Mysql version" 
"--mysql-version"
+    parameters::check_and_save_allowed_param "MSSQL_VERSION" "MSSql version" 
"--mssql-version"
     parameters::check_and_save_allowed_param "GITHUB_REGISTRY" "GitHub 
Registry" "--github-registry"
 
     parameters::check_allowed_param TEST_TYPE "Type of tests" "--test-type"
@@ -3156,6 +3161,7 @@ function breeze::check_and_save_all_params() {
 #     WEBSERVER_HOST_PORT
 #     POSTGRES_HOST_PORT
 #     MYSQL_HOST_PORT
+#     MSSQL_HOST_PORT
 #
 
#######################################################################################################
 function breeze::print_cheatsheet() {
@@ -3188,6 +3194,7 @@ function breeze::print_cheatsheet() {
         echo "     * ${FLOWER_HOST_PORT} -> forwarded to Flower dashboard -> 
airflow:5555"
         echo "     * ${POSTGRES_HOST_PORT} -> forwarded to Postgres database 
-> postgres:5432"
         echo "     * ${MYSQL_HOST_PORT} -> forwarded to MySQL database  -> 
mysql:3306"
+        echo "     * ${MSSQL_HOST_PORT} -> forwarded to MSSQL database  -> 
mssql:1443"
         echo "     * ${REDIS_HOST_PORT} -> forwarded to Redis broker -> 
redis:6379"
         echo
         echo "   Here are links to those services that you can use on host:"
diff --git a/breeze-complete b/breeze-complete
index 6126ca4..b0985eb 100644
--- a/breeze-complete
+++ b/breeze-complete
@@ -24,7 +24,7 @@
 # Those cannot be made read-only as the breeze-complete must be re-sourceable
 
 _breeze_allowed_python_major_minor_versions="3.6 3.7 3.8"
-_breeze_allowed_backends="sqlite mysql postgres"
+_breeze_allowed_backends="sqlite mysql postgres mssql"
 _breeze_allowed_integrations="cassandra kerberos mongo openldap pinot rabbitmq 
redis statsd trino all"
 _breeze_allowed_generate_constraints_modes="source-providers pypi-providers 
no-providers"
 # registrys is good here even if it is not correct english. We are adding s 
automatically to all variables
@@ -34,6 +34,7 @@ _breeze_allowed_kubernetes_versions="v1.20.2 v1.19.7 v1.18.15"
 _breeze_allowed_helm_versions="v3.2.4"
 _breeze_allowed_kind_versions="v0.10.0"
 _breeze_allowed_mysql_versions="5.7 8"
+_breeze_allowed_mssql_versions="2017-latest 2019-latest"
 _breeze_allowed_postgres_versions="9.6 10 11 12 13"
 _breeze_allowed_kind_operations="start stop restart status deploy test shell 
k9s"
 _breeze_allowed_executors="KubernetesExecutor CeleryExecutor LocalExecutor 
CeleryKubernetesExecutor"
@@ -54,6 +55,7 @@ _breeze_allowed_installation_methods=". apache-airflow"
     _breeze_default_executor=$(echo "${_breeze_allowed_executors}" | awk 
'{print $1}')
     _breeze_default_postgres_version=$(echo 
"${_breeze_allowed_postgres_versions}" | awk '{print $1}')
     _breeze_default_mysql_version=$(echo "${_breeze_allowed_mysql_versions}" | 
awk '{print $1}')
+    _breeze_default_mssql_version=$(echo "${_breeze_allowed_mssql_versions}" | 
awk '{print $1}')
     _breeze_default_test_type=$(echo "${_breeze_allowed_test_types}" | awk 
'{print $1}')
     _breeze_default_package_format=$(echo "${_breeze_allowed_package_formats}" 
| awk '{print $1}')
 }
@@ -175,7 +177,7 @@ verbose assume-yes assume-no assume-quit 
forward-credentials init-script:
 force-build-images force-pull-images force-pull-base-python-image 
production-image extras: force-clean-images skip-rebuild-check
 build-cache-local build-cache-pulled build-cache-disabled disable-pip-cache
 dockerhub-user: dockerhub-repo: use-github-registry github-registry: 
github-repository: github-image-id: generate-constraints-mode:
-postgres-version: mysql-version:
+postgres-version: mysql-version: mssql-version:
 version-suffix-for-pypi: version-suffix-for-svn:
 additional-extras: additional-python-deps: additional-dev-deps: 
additional-runtime-deps: image-tag:
 disable-mysql-client-installation constraints-location: disable-pip-cache 
install-from-docker-context-files
@@ -287,6 +289,9 @@ function breeze_complete::get_known_values_breeze() {
     --mysql-version)
         _breeze_known_values=${_breeze_allowed_mysql_versions}
         ;;
+    --mssql-version)
+        _breeze_known_values=${_breeze_allowed_mssql_versions}
+        ;;
     -D | --dockerhub-user)
         _breeze_known_values="${_breeze_default_dockerhub_user}"
         ;;
diff --git a/docs/apache-airflow/installation.rst 
b/docs/apache-airflow/installation.rst
index e8fb3e7..ace814e 100644
--- a/docs/apache-airflow/installation.rst
+++ b/docs/apache-airflow/installation.rst
@@ -65,6 +65,7 @@ Airflow is tested with:
   * PostgreSQL:  9.6, 10, 11, 12, 13
   * MySQL: 5.7, 8
   * SQLite: 3.15.0+
+  * MSSQL(Experimental): 2017, 2019
 
 * Kubernetes: 1.18.15 1.19.7 1.20.2
 
diff --git a/scripts/ci/docker-compose/base.yml 
b/scripts/ci/docker-compose/backend-mssql-port.yml
similarity index 55%
copy from scripts/ci/docker-compose/base.yml
copy to scripts/ci/docker-compose/backend-mssql-port.yml
index eab6425..42fd418 100644
--- a/scripts/ci/docker-compose/base.yml
+++ b/scripts/ci/docker-compose/backend-mssql-port.yml
@@ -17,24 +17,6 @@
 ---
 version: "2.2"
 services:
-  airflow:
-    image: ${AIRFLOW_IMAGE}
-    environment:
-      - USER=root
-      - ADDITIONAL_PATH=~/.local/bin
-      - 
CELERY_BROKER_URLS=amqp://guest:guest@rabbitmq:5672,redis://redis:6379/0
-      - KUBECONFIG=/files/.kube/config
-      - HOST_HOME=${HOME}
-    env_file:
-      - _docker.env
-    volumes:
-      # Pass docker to inside of the container so that Kind and Moto tests can 
use it.
-      - /var/run/docker.sock:/var/run/docker.sock
-      - /dev/urandom:/dev/random   # Required to get non-blocking entropy 
source
+  mssql:
     ports:
-      - "${WEBSERVER_HOST_PORT}:8080"
-      - "${FLOWER_HOST_PORT}:5555"
-volumes:
-  sqlite-db-volume:
-  postgres-db-volume:
-  mysql-db-volume:
+      - "${MSSQL_HOST_PORT}:1433"
diff --git a/scripts/ci/docker-compose/base.yml 
b/scripts/ci/docker-compose/backend-mssql.yml
similarity index 57%
copy from scripts/ci/docker-compose/base.yml
copy to scripts/ci/docker-compose/backend-mssql.yml
index eab6425..93d2da1 100644
--- a/scripts/ci/docker-compose/base.yml
+++ b/scripts/ci/docker-compose/backend-mssql.yml
@@ -18,23 +18,16 @@
 version: "2.2"
 services:
   airflow:
-    image: ${AIRFLOW_IMAGE}
     environment:
-      - USER=root
-      - ADDITIONAL_PATH=~/.local/bin
-      - 
CELERY_BROKER_URLS=amqp://guest:guest@rabbitmq:5672,redis://redis:6379/0
-      - KUBECONFIG=/files/.kube/config
-      - HOST_HOME=${HOME}
-    env_file:
-      - _docker.env
+      - BACKEND=mssql
+      - 
AIRFLOW__CORE__SQL_ALCHEMY_CONN=mssql+pyodbc://sa:Airflow123@mssql:1433/master?driver=ODBC+Driver+17+for+SQL+Server
+      - 
AIRFLOW__CELERY__RESULT_BACKEND=db+mssql+pyodbc://sa:Airflow123@mssql:1433/master?driver=ODBC+Driver+17+for+SQL+Server
+    depends_on:
+      - mssql
+  mssql:
+    image: mcr.microsoft.com/mssql/server:${MSSQL_VERSION}
+    environment:
+      - ACCEPT_EULA=Y
+      - SA_PASSWORD=Airflow123
     volumes:
-      # Pass docker to inside of the container so that Kind and Moto tests can 
use it.
-      - /var/run/docker.sock:/var/run/docker.sock
-      - /dev/urandom:/dev/random   # Required to get non-blocking entropy 
source
-    ports:
-      - "${WEBSERVER_HOST_PORT}:8080"
-      - "${FLOWER_HOST_PORT}:5555"
-volumes:
-  sqlite-db-volume:
-  postgres-db-volume:
-  mysql-db-volume:
+      - mssql-db-volume:/var/opt/mssql
diff --git a/scripts/ci/docker-compose/base.yml 
b/scripts/ci/docker-compose/base.yml
index eab6425..a3d1842 100644
--- a/scripts/ci/docker-compose/base.yml
+++ b/scripts/ci/docker-compose/base.yml
@@ -38,3 +38,4 @@ volumes:
   sqlite-db-volume:
   postgres-db-volume:
   mysql-db-volume:
+  mssql-db-volume:
diff --git a/scripts/ci/libraries/_initialization.sh 
b/scripts/ci/libraries/_initialization.sh
index c734ccd..e824963 100644
--- a/scripts/ci/libraries/_initialization.sh
+++ b/scripts/ci/libraries/_initialization.sh
@@ -23,6 +23,7 @@ CURRENT_KUBERNETES_VERSIONS=()
 CURRENT_KUBERNETES_MODES=()
 CURRENT_POSTGRES_VERSIONS=()
 CURRENT_MYSQL_VERSIONS=()
+CURRENT_MSSQL_VERSIONS=()
 CURRENT_KIND_VERSIONS=()
 CURRENT_HELM_VERSIONS=()
 CURRENT_EXECUTOR=()
@@ -74,6 +75,7 @@ function initialization::initialize_base_variables() {
     export WEBSERVER_HOST_PORT=${WEBSERVER_HOST_PORT:="28080"}
     export POSTGRES_HOST_PORT=${POSTGRES_HOST_PORT:="25433"}
     export MYSQL_HOST_PORT=${MYSQL_HOST_PORT:="23306"}
+    export MSSQL_HOST_PORT=${MSSQL_HOST_PORT:="21433"}
     export FLOWER_HOST_PORT=${FLOWER_HOST_PORT:="25555"}
     export REDIS_HOST_PORT=${REDIS_HOST_PORT:="26379"}
 
@@ -103,6 +105,10 @@ function initialization::initialize_base_variables() {
     CURRENT_MYSQL_VERSIONS+=("5.7" "8")
     export CURRENT_MYSQL_VERSIONS
 
+    # Currently supported versions of MSSQL
+    CURRENT_MSSQL_VERSIONS+=("2017-latest" "2019-latest")
+    export CURRENT_MSSQL_VERSIONS
+
     BACKEND=${BACKEND:="sqlite"}
     export BACKEND
 
@@ -112,6 +118,9 @@ function initialization::initialize_base_variables() {
     # Default MySQL versions
     export MYSQL_VERSION=${MYSQL_VERSION:=${CURRENT_MYSQL_VERSIONS[0]}}
 
+    #Default MS SQL version
+    export MSSQL_VERSION=${MSSQL_VERSION:=${CURRENT_MSSQL_VERSIONS[0]}}
+
     # If set to true, the database will be reset at entry. Works for Postgres 
and MySQL
     export DB_RESET=${DB_RESET:="false"}
 
@@ -897,6 +906,7 @@ function initialization::make_constants_read_only() {
     readonly CURRENT_KUBERNETES_MODES
     readonly CURRENT_POSTGRES_VERSIONS
     readonly CURRENT_MYSQL_VERSIONS
+    readonly CURRENT_MSSQL_VERSIONS
     readonly CURRENT_KIND_VERSIONS
     readonly CURRENT_HELM_VERSIONS
     readonly CURRENT_EXECUTOR
diff --git a/scripts/ci/selective_ci_checks.sh 
b/scripts/ci/selective_ci_checks.sh
index 7e77edd..9a115a2 100755
--- a/scripts/ci/selective_ci_checks.sh
+++ b/scripts/ci/selective_ci_checks.sh
@@ -103,9 +103,19 @@ function output_all_basic_variables() {
         initialization::ga_output mysql-versions \
             "$(initialization::parameters_to_json "${MYSQL_VERSION}")"
     fi
-
     initialization::ga_output default-mysql-version "${MYSQL_VERSION}"
 
+    if [[ ${FULL_TESTS_NEEDED_LABEL} == "true" ]]; then
+        initialization::ga_output mssql-versions \
+            "$(initialization::parameters_to_json 
"${CURRENT_MSSQL_VERSIONS[@]}")"
+    else
+        initialization::ga_output mssql-versions \
+            "$(initialization::parameters_to_json "${MSSQL_VERSION}")"
+    fi
+    initialization::ga_output default-mssql-version "${MSSQL_VERSION}"
+
+
+
     initialization::ga_output kind-versions \
         "$(initialization::parameters_to_json "${CURRENT_KIND_VERSIONS[@]}")"
     initialization::ga_output default-kind-version "${KIND_VERSION}"
@@ -117,10 +127,12 @@ function output_all_basic_variables() {
     if [[ ${FULL_TESTS_NEEDED_LABEL} == "true" ]]; then
         initialization::ga_output postgres-exclude '[{ "python-version": "3.6" 
}]'
         initialization::ga_output mysql-exclude '[{ "python-version": "3.7" }]'
+        initialization::ga_output mssql-exclude '[{ "python-version": "3.7" }]'
         initialization::ga_output sqlite-exclude '[{ "python-version": "3.8" 
}]'
     else
         initialization::ga_output postgres-exclude '[]'
         initialization::ga_output mysql-exclude '[]'
+        initialization::ga_output mssql-exclude '[]'
         initialization::ga_output sqlite-exclude '[]'
     fi
 
diff --git a/scripts/ci/testing/ci_run_airflow_testing.sh 
b/scripts/ci/testing/ci_run_airflow_testing.sh
index fa8c044..9368b87 100755
--- a/scripts/ci/testing/ci_run_airflow_testing.sh
+++ b/scripts/ci/testing/ci_run_airflow_testing.sh
@@ -96,6 +96,12 @@ function run_all_test_types_in_parallel() {
             # Remove Integration from list of tests to run in parallel
             test_types_to_run="${test_types_to_run//Integration/}"
             run_integration_tests_separately="true"
+            if [[ ${BACKEND} == "mssql" ]]; then
+              # Skip running "Integration" tests for low memory condition for 
mssql
+              run_integration_tests_separately="false"
+            else
+              run_integration_tests_separately="true"
+            fi
         fi
     fi
     set +e
diff --git a/scripts/in_container/check_environment.sh 
b/scripts/in_container/check_environment.sh
index 22c6fe5..3237bc9 100755
--- a/scripts/in_container/check_environment.sh
+++ b/scripts/in_container/check_environment.sh
@@ -98,6 +98,8 @@ function check_db_backend {
         check_service "PostgreSQL" "run_nc postgres 5432" "${MAX_CHECK}"
     elif [[ ${BACKEND} == "mysql" ]]; then
         check_service "MySQL" "run_nc mysql 3306" "${MAX_CHECK}"
+    elif [[ ${BACKEND} == "mssql" ]]; then
+        check_service "MSSQL" "run_nc mssql 1433" "${MAX_CHECK}"
     elif [[ ${BACKEND} == "sqlite" ]]; then
         return
     else
diff --git a/tests/bats/breeze/test_breeze_complete.bats 
b/tests/bats/breeze/test_breeze_complete.bats
index c1dfed1..249a493 100644
--- a/tests/bats/breeze/test_breeze_complete.bats
+++ b/tests/bats/breeze/test_breeze_complete.bats
@@ -239,6 +239,22 @@
   assert_equal "${_breeze_default_mysql_version}" "${MYSQL_VERSION}"
 }
 
+@test "Test allowed MSSQL versions same as CURRENT" {
+  load ../bats_utils
+  #shellcheck source=breeze-complete
+  source "${AIRFLOW_SOURCES}/breeze-complete"
+
+  assert_equal "${_breeze_allowed_mssql_versions}" 
"${CURRENT_MSSQL_VERSIONS[*]}"
+}
+
+@test "Test default MSSQL version same as MSSQL_VERSION" {
+  load ../bats_utils
+  #shellcheck source=breeze-complete
+  source "${AIRFLOW_SOURCES}/breeze-complete"
+
+  assert_equal "${_breeze_default_mssql_version}" "${MSSQL_VERSION}"
+}
+
 @test "Test allowed Postgres versions same as CURRENT" {
   load ../bats_utils
   #shellcheck source=breeze-complete
diff --git a/tests/models/test_renderedtifields.py 
b/tests/models/test_renderedtifields.py
index d83f542..bd88d5d 100644
--- a/tests/models/test_renderedtifields.py
+++ b/tests/models/test_renderedtifields.py
@@ -172,7 +172,14 @@ class TestRenderedTaskInstanceFields(unittest.TestCase):
         assert rtif_num == len(result)
 
         # Verify old records are deleted and only 'num_to_keep' records are 
kept
-        with assert_queries_count(expected_query_count):
+        # For other DBs,an extra query is fired in 
RenderedTaskInstanceFields.delete_old_records
+        expected_query_count_based_on_db = (
+            expected_query_count + 1
+            if session.bind.dialect.name == "mssql" and expected_query_count 
!= 0
+            else expected_query_count
+        )
+
+        with assert_queries_count(expected_query_count_based_on_db):
             RTIF.delete_old_records(task_id=task.task_id, dag_id=task.dag_id, 
num_to_keep=num_to_keep)
         result = session.query(RTIF).filter(RTIF.dag_id == dag.dag_id, 
RTIF.task_id == task.task_id).all()
         assert remaining_rtifs == len(result)
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index 35c00fe..9c2beb6 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -2098,8 +2098,15 @@ class TestRunRawTaskQueriesCount(unittest.TestCase):
                 run_type=DagRunType.SCHEDULED,
                 session=session,
             )
+        # an extra query is fired in 
RenderedTaskInstanceFields.delete_old_records
+        # for other DBs. delete_old_records is called only when mark_success 
is False
+        expected_query_count_based_on_db = (
+            expected_query_count + 1
+            if session.bind.dialect.name == "mssql" and expected_query_count > 
0 and not mark_success
+            else expected_query_count
+        )
 
-        with assert_queries_count(expected_query_count):
+        with assert_queries_count(expected_query_count_based_on_db):
             ti._run_raw_task(mark_success=mark_success)
 
     def test_execute_queries_count_store_serialized(self):
@@ -2116,8 +2123,11 @@ class TestRunRawTaskQueriesCount(unittest.TestCase):
                 run_type=DagRunType.SCHEDULED,
                 session=session,
             )
+        # an extra query is fired in 
RenderedTaskInstanceFields.delete_old_records
+        # for other DBs
+        expected_query_count_based_on_db = 13 if session.bind.dialect.name == 
"mssql" else 12
 
-        with assert_queries_count(12):
+        with assert_queries_count(expected_query_count_based_on_db):
             ti._run_raw_task()
 
     def test_operator_field_with_serialization(self):
diff --git a/tests/utils/test_db.py b/tests/utils/test_db.py
index e608ff1..601dc6f 100644
--- a/tests/utils/test_db.py
+++ b/tests/utils/test_db.py
@@ -68,6 +68,12 @@ class TestDb(unittest.TestCase):
             lambda t: (t[0] == 'remove_index' and t[1].name == 
'permission_view_id'),
             # from test_security unit test
             lambda t: (t[0] == 'remove_table' and t[1].name == 'some_model'),
+            # MSSQL default tables
+            lambda t: (t[0] == 'remove_table' and t[1].name == 'spt_monitor'),
+            lambda t: (t[0] == 'remove_table' and t[1].name == 
'spt_fallback_db'),
+            lambda t: (t[0] == 'remove_table' and t[1].name == 
'spt_fallback_usg'),
+            lambda t: (t[0] == 'remove_table' and t[1].name == 
'MSreplication_options'),
+            lambda t: (t[0] == 'remove_table' and t[1].name == 
'spt_fallback_dev'),
         ]
         for ignore in ignores:
             diff = [d for d in diff if not ignore(d)]

Reply via email to