ephraimbuddy commented on code in PR #48234:
URL: https://github.com/apache/airflow/pull/48234#discussion_r2022673727
##########
airflow-core/src/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py:
##########
@@ -99,14 +82,151 @@ def upgrade():
sa.UniqueConstraint("dag_id", "version_number",
name="dag_id_v_name_v_number_unique_constraint"),
)
with op.batch_alter_table(
- "dag_code", recreate="always", naming_convention=naming_convention,
copy_from=old_dagcode_table
+ "dag_code",
) as batch_op:
batch_op.drop_constraint("dag_code_pkey", type_="primary")
- batch_op.add_column(
- sa.Column("id", UUIDType(binary=False), primary_key=True),
insert_before="fileloc_hash"
+ batch_op.add_column(sa.Column("id", UUIDType(binary=False)))
+ batch_op.add_column(sa.Column("dag_version_id",
UUIDType(binary=False)))
+ batch_op.add_column(sa.Column("source_code_hash",
sa.String(length=32)))
+ batch_op.add_column(sa.Column("dag_id", StringID()))
+ batch_op.add_column(sa.Column("created_at", TIMESTAMP(),
default=timezone.utcnow))
+
+ with op.batch_alter_table(
+ "serialized_dag",
+ ) as batch_op:
+ batch_op.add_column(sa.Column("id", UUIDType(binary=False)))
+ batch_op.drop_index("idx_fileloc_hash")
+ batch_op.add_column(sa.Column("dag_version_id",
UUIDType(binary=False)))
+ batch_op.add_column(sa.Column("created_at", TIMESTAMP(),
default=timezone.utcnow))
+
+ # Data migration
+ rows = _get_rows("SELECT dag_id FROM serialized_dag", conn)
+
+ stmt = sa.text("""
+ UPDATE serialized_dag
+ SET id = :_id
+ WHERE dag_id = :dag_id AND id IS NULL
+ """)
+
+ for row in rows:
+ id = uuid7()
+ conn.execute(stmt.bindparams(_id=id, dag_id=row.dag_id))
+ id2 = uuid7()
+ # Update dagversion table
+ conn.execute(
+ sa.text("""
+ INSERT INTO dag_version (id, version_number, dag_id, created_at,
last_updated)
+ VALUES (:id, 1, :dag_id, :created_at, :last_updated)
+ """).bindparams(
+ id=id2, dag_id=row.dag_id, created_at=timezone.utcnow(),
last_updated=timezone.utcnow()
+ )
)
+
+ # Update serialized_dag table with dag_version_id where dag_id matches
+ if conn.dialect.name == "mysql":
+ conn.execute(
+ sa.text("""
+ UPDATE serialized_dag sd
+ JOIN dag_version dv ON sd.dag_id = dv.dag_id
+ SET sd.dag_version_id = dv.id,
+ created_at = dv.created_at
+ """)
+ )
+ else:
+ conn.execute(
+ sa.text("""
+ UPDATE serialized_dag
+ SET dag_version_id = dag_version.id,
+ created_at = dag_version.created_at
+ FROM dag_version
+ WHERE serialized_dag.dag_id = dag_version.dag_id
+ """)
+ )
+ # Update dag_code table where fileloc_hash of serialized_dag matches
+ if conn.dialect.name == "mysql":
+ conn.execute(
+ sa.text("""
+ UPDATE dag_code dc
+ JOIN serialized_dag sd ON dc.fileloc_hash = sd.fileloc_hash
+ SET dc.dag_version_id = sd.dag_version_id,
+ dc.created_at = sd.created_at,
+ dc.dag_id = sd.dag_id
+ """)
+ )
+ else:
+ conn.execute(
+ sa.text("""
+ UPDATE dag_code
+ SET dag_version_id = dag_version.id,
+ created_at = serialized_dag.created_at,
+ dag_id = serialized_dag.dag_id
+ FROM serialized_dag, dag_version
+ WHERE dag_code.fileloc_hash = serialized_dag.fileloc_hash
+ AND serialized_dag.dag_version_id = dag_version.id
+ """)
+ )
+
+ # select all rows in serialized_dag where the dag_id is not in dag_code
Review Comment:
Yeah, in Airflow 2, fileloc_hash is the primary key for dag_code. So if a
file defined two dags, only one record of fileloc will be stored since the hash
will be the same for the two dags
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]