ashb commented on a change in pull request #9973:
URL: https://github.com/apache/airflow/pull/9973#discussion_r598704465
##########
File path: airflow/models/dag.py
##########
@@ -104,7 +105,9 @@ def get_last_dagrun(dag_id, session,
include_externally_triggered=False):
DR = DagRun
query = session.query(DR).filter(DR.dag_id == dag_id)
if not include_externally_triggered:
- query = query.filter(DR.external_trigger == False) # noqa pylint:
disable=singleton-comparison
+ query = query.filter(
+ DR.external_trigger == expression.false()
+ ) # noqa pylint: disable=singleton-comparison
Review comment:
```suggestion
query = query.filter(
DR.external_trigger == expression.false()
)
```
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1585,15 +1586,16 @@ def _create_dag_runs(self, dag_models:
Iterable[DagModel], session: Session) ->
# as DagModel.dag_id and DagModel.next_dagrun
# This list is used to verify if the DagRun already exist so that we
don't attempt to create
# duplicate dag runs
- active_dagruns = (
- session.query(DagRun.dag_id, DagRun.execution_date)
- .filter(
- tuple_(DagRun.dag_id, DagRun.execution_date).in_(
- [(dm.dag_id, dm.next_dagrun) for dm in dag_models]
- )
+
+ filter_dms = [
+ and_(
+ DagRun.dag_id == dm.dag_id,
+ DagRun.execution_date == dm.next_dagrun,
Review comment:
This is a possible performance regression -- if MSSQL doesn't support
this when we might need to look at having a query method specialisation per DB
backend
##########
File path:
airflow/migrations/versions/8d48763f6d53_add_unique_constraint_to_conn_id.py
##########
@@ -38,9 +38,9 @@ def upgrade():
"""Apply add unique constraint to conn_id and set it as non-nullable"""
try:
with op.batch_alter_table('connection') as batch_op:
+ batch_op.alter_column("conn_id", nullable=False,
existing_type=sa.String(250))
batch_op.create_unique_constraint(constraint_name="unique_conn_id",
columns=["conn_id"])
- batch_op.alter_column("conn_id", nullable=False,
existing_type=sa.String(250))
Review comment:
MSSQL won't let you create a unique constraint while the column is
nullable?
##########
File path:
airflow/migrations/versions/bbf4a7ad0465_remove_id_column_from_xcom.py
##########
@@ -43,9 +100,14 @@ def upgrade():
with op.batch_alter_table('xcom') as bop:
xcom_columns = [col.get('name') for col in
inspector.get_columns("xcom")]
if "id" in xcom_columns:
+ if conn.dialect.name == 'mssql':
+ constraint_dict = get_table_constraints(conn, "xcom")
+ drop_column_constraints(bop, 'id', constraint_dict)
bop.drop_column('id')
bop.drop_index('idx_xcom_dag_task_date')
- bop.create_primary_key('pk_xcom', ['dag_id', 'task_id', 'key',
'execution_date'])
+ # mssql doesn't allow primary keys with nullable columns
+ if conn.dialect.name != 'mssql':
+ bop.create_primary_key('pk_xcom', ['dag_id', 'task_id', 'key',
'execution_date'])
Review comment:
These columns _shouldn't_ be nullable, so lets fix that.
However since this migration has already been released, this change will
have to stay, and you'll need to create a new migration that makes these cols
non-null and adds the missing migration for MSSQL
##########
File path: airflow/models/dagcode.py
##########
@@ -147,7 +148,7 @@ def has_dag(cls, fileloc: str, session=None) -> bool:
:param session: ORM Session
"""
fileloc_hash = cls.dag_fileloc_hash(fileloc)
- return session.query(exists().where(cls.fileloc_hash ==
fileloc_hash)).scalar()
+ return session.query(literal(True)).filter(cls.fileloc_hash ==
fileloc_hash).first() is not None
Review comment:
Why this change? Does SQLA not produce the right SQL for MSSQL in this
query?
Either way:
```suggestion
return session.query(literal(True)).filter(cls.fileloc_hash ==
fileloc_hash).one_or_none() is not None
```
##########
File path: tests/models/test_renderedtifields.py
##########
@@ -172,7 +172,10 @@ def test_delete_old_records(self, rtif_num, num_to_keep,
remaining_rtifs, expect
assert rtif_num == len(result)
# Verify old records are deleted and only 'num_to_keep' records are
kept
- with assert_queries_count(expected_query_count):
+ expected_query_count_based_on_db = (expected_query_count + 1,
expected_query_count)[
+ session.bind.dialect.name in ["postgresql", "sqlite", "mysql"] or
expected_query_count == 0
+ ]
Review comment:
Why does this query count change for mssql over the other backends DBs?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]