This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-3-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit d9075f89e1b6093ec31610637ed74ad8437abb01 Author: Daniel Standish <[email protected]> AuthorDate: Fri May 6 05:42:22 2022 -0700 Only count bad refs when `moved` table exists (#23491) This keeps the logic to fail without upgrading when (A) there are bad rows and (B) the "moved" table already exists. But we optimize so that we don't count the bad rows unless the "moved" table is there. Previously we counted always, but the first time a user attempts upgrade, the tables won't be there so there's no point in counting. Instead what we do is skip right to the CTAS, creating the _airflow_moved tables. If there aren't any rows in the "moved" table, then we delete the table immediately. Also included here is a delete optimization, where we join to the moved table instead of running the not exists query again. Co-authored-by: Jed Cunningham <[email protected]> Co-authored-by: Ash Berlin-Taylor <[email protected]> (cherry picked from commit 6cc41abf6912fd2705b9ef7cf368c888c43c8af8) --- airflow/utils/db.py | 53 ++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 40 insertions(+), 13 deletions(-) diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 0e1ada2bbd..7325c0e243 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -26,7 +26,7 @@ from dataclasses import dataclass from tempfile import gettempdir from typing import TYPE_CHECKING, Callable, Iterable, List, Optional, Tuple, Union -from sqlalchemy import Table, and_, column, exc, func, inspect, or_, table, text +from sqlalchemy import Table, and_, column, exc, func, inspect, or_, select, table, text from sqlalchemy.orm.session import Session import airflow @@ -1054,6 +1054,7 @@ def _move_dangling_data_to_new_table( dialect_name = bind.dialect.name # First: Create moved rows from new table + log.debug("running CTAS for table %s", target_table_name) _create_table_as( dialect_name=dialect_name, source_query=source_query, @@ -1061,9 +1062,30 @@ def _move_dangling_data_to_new_table( source_table_name=source_table.name, session=session, ) + session.commit() - delete = source_table.delete().where(~exists_subquery.exists()) - session.execute(delete) + target_table = source_table.to_metadata(source_table.metadata, name=target_table_name) + log.debug("checking whether rows were moved for table %s", target_table_name) + moved_rows_exist_query = select([1]).select_from(target_table).limit(1) + first_moved_row = session.execute(moved_rows_exist_query).all() + session.commit() + + if not first_moved_row: + log.debug("no rows moved; dropping %s", target_table_name) + target_table.drop(bind=session.get_bind(), checkfirst=True) + else: + log.debug("rows moved; purging from %s", source_table.name) + if dialect_name == 'sqlite': + delete = source_table.delete().where(~exists_subquery.exists()) + else: + delete = source_table.delete().where( + and_(col == target_table.c[col.name] for col in source_table.primary_key.columns) + ) + log.debug(delete.compile()) + session.execute(delete) + session.commit() + + log.debug("exiting move function") def _dag_run_exists(session, source_table, dag_run): @@ -1226,6 +1248,7 @@ def check_bad_references(session: Session) -> Iterable[str]: errored = False for model, change_version, bad_ref_cfg in models_list: + log.debug("checking model %s", model.__tablename__) # We can't use the model here since it may differ from the db state due to # this function is run prior to migration. Use the reflected table instead. exists_func_kwargs = {x: metadata.tables[x] for x in bad_ref_cfg.join_tables} @@ -1240,20 +1263,23 @@ def check_bad_references(session: Session) -> Iterable[str]: bad_rows_subquery = bad_ref_cfg.exists_func(session, source_table, **exists_func_kwargs) select_list = [x.label(x.name) for x in source_table.c] invalid_rows_query = session.query(*select_list).filter(~bad_rows_subquery.exists()) - invalid_row_count = invalid_rows_query.count() - if invalid_row_count <= 0: - continue dangling_table_name = _format_airflow_moved_table_name(source_table.name, change_version, 'dangling') if dangling_table_name in existing_table_names: - yield _format_dangling_error( - source_table=source_table.name, - target_table=dangling_table_name, - invalid_count=invalid_row_count, - reason=f"without a corresponding {bad_ref_cfg.ref_table} row", - ) - errored = True + invalid_row_count = invalid_rows_query.count() + if invalid_row_count <= 0: + continue + else: + yield _format_dangling_error( + source_table=source_table.name, + target_table=dangling_table_name, + invalid_count=invalid_row_count, + reason=f"without a corresponding {bad_ref_cfg.ref_table} row", + ) + errored = True continue + + log.debug("moving data for table %s", source_table.name) _move_dangling_data_to_new_table( session, source_table, @@ -1282,6 +1308,7 @@ def _check_migration_errors(session: Session = NEW_SESSION) -> Iterable[str]: check_bad_references, ) for check_fn in check_functions: + log.debug("running check function %s", check_fn.__name__) yield from check_fn(session=session) # Ensure there is no "active" transaction. Seems odd, but without this MSSQL can hang session.commit()
