Repository: incubator-airflow Updated Branches: refs/heads/master b0669b532 -> 0d0cc62f4
[AIRFLOW-1452] workaround lock on method Workaround lock on method "has_table" in case mssql is used as storage engine. Closes #2514 from patsak/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0d0cc62f Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0d0cc62f Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0d0cc62f Branch: refs/heads/master Commit: 0d0cc62f49525166bc877606affa5a623ba52c4d Parents: b0669b5 Author: k.privezentsev <konstantin.privezent...@kaspersky.com> Authored: Fri Aug 11 11:47:35 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Fri Aug 11 11:47:42 2017 -0700 ---------------------------------------------------------------------- .../cc1e65623dc7_add_max_tries_column_to_task_instance.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0d0cc62f/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py ---------------------------------------------------------------------- diff --git a/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py b/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py index 2d5ffc2..b151e0c 100644 --- a/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py +++ b/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py @@ -29,6 +29,7 @@ from alembic import op import sqlalchemy as sa from airflow import settings from airflow.models import DagBag, TaskInstance +from sqlalchemy.engine.reflection import Inspector BATCH_SIZE = 5000 @@ -39,10 +40,12 @@ def upgrade(): # needed for database that does not create table until migration finishes. # Checking task_instance table exists prevent the error of querying # non-existing task_instance table. - engine = settings.engine - if engine.dialect.has_table(engine, 'task_instance'): + connection = op.get_bind() + inspector = Inspector.from_engine(connection) + tables = inspector.get_table_names() + + if 'task_instance' in tables: # Get current session - connection = op.get_bind() sessionmaker = sa.orm.sessionmaker() session = sessionmaker(bind=connection) dagbag = DagBag(settings.DAGS_FOLDER)