uranusjr commented on code in PR #49563: URL: https://github.com/apache/airflow/pull/49563#discussion_r2055265525
########## airflow-core/src/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py: ########## @@ -84,165 +63,15 @@ def upgrade(): sa.PrimaryKeyConstraint("id", name=op.f("dag_version_pkey")), sa.UniqueConstraint("dag_id", "version_number", name="dag_id_v_name_v_number_unique_constraint"), ) - with op.batch_alter_table( - "dag_code", - ) as batch_op: - batch_op.drop_constraint("dag_code_pkey", type_="primary") - batch_op.add_column(sa.Column("id", UUIDType(binary=False))) - batch_op.add_column(sa.Column("dag_version_id", UUIDType(binary=False))) - batch_op.add_column(sa.Column("source_code_hash", sa.String(length=32))) - batch_op.add_column(sa.Column("dag_id", StringID())) - batch_op.add_column(sa.Column("created_at", TIMESTAMP(), default=timezone.utcnow)) - - with op.batch_alter_table( - "serialized_dag", - ) as batch_op: - batch_op.add_column(sa.Column("id", UUIDType(binary=False))) - batch_op.drop_index("idx_fileloc_hash") - batch_op.add_column(sa.Column("dag_version_id", UUIDType(binary=False))) - batch_op.add_column(sa.Column("created_at", TIMESTAMP(), default=timezone.utcnow)) - - # Data migration - rows = _get_rows("SELECT dag_id FROM serialized_dag", conn) - - stmt = sa.text(""" - UPDATE serialized_dag - SET id = :_id - WHERE dag_id = :dag_id AND id IS NULL - """) - - for row in rows: - id = uuid7() - if conn.dialect.name != "postgresql": - id = id.hex - else: - id = str(id) - - conn.execute(stmt.bindparams(_id=id, dag_id=row.dag_id)) - id2 = uuid7() - if conn.dialect.name != "postgresql": - id2 = id2.hex - else: - id2 = str(id2) - # Update dagversion table - conn.execute( - sa.text(""" - INSERT INTO dag_version (id, version_number, dag_id, created_at, last_updated) - VALUES (:id, 1, :dag_id, :created_at, :last_updated) - """).bindparams( - id=id2, dag_id=row.dag_id, created_at=timezone.utcnow(), last_updated=timezone.utcnow() - ) - ) - - # Update serialized_dag table with dag_version_id where dag_id matches - if conn.dialect.name == "mysql": - conn.execute( - sa.text(""" - UPDATE serialized_dag sd - JOIN dag_version dv ON sd.dag_id = dv.dag_id - SET sd.dag_version_id = dv.id, - sd.created_at = dv.created_at - """) - ) - else: - conn.execute( - sa.text(""" - UPDATE serialized_dag - SET dag_version_id = dag_version.id, - created_at = dag_version.created_at - FROM dag_version - WHERE serialized_dag.dag_id = dag_version.dag_id - """) - ) - # Update dag_code table where fileloc_hash of serialized_dag matches - if conn.dialect.name == "mysql": - conn.execute( - sa.text(""" - UPDATE dag_code dc - JOIN serialized_dag sd ON dc.fileloc_hash = sd.fileloc_hash - SET dc.dag_version_id = sd.dag_version_id, - dc.created_at = sd.created_at, - dc.dag_id = sd.dag_id - """) - ) - else: - conn.execute( - sa.text(""" - UPDATE dag_code - SET dag_version_id = dag_version.id, - created_at = serialized_dag.created_at, - dag_id = serialized_dag.dag_id - FROM serialized_dag, dag_version - WHERE dag_code.fileloc_hash = serialized_dag.fileloc_hash - AND serialized_dag.dag_version_id = dag_version.id - """) - ) - - # select all rows in serialized_dag where the dag_id is not in dag_code - - stmt = """ - SELECT dag_id, fileloc, fileloc_hash, dag_version_id - FROM serialized_dag - WHERE dag_id NOT IN (SELECT dag_id FROM dag_code) - AND dag_id in (SELECT dag_id FROM dag) - """ - rows = _get_rows(stmt, conn) - # Insert the missing rows from serialized_dag to dag_code - stmt = sa.text(""" - INSERT INTO dag_code (dag_version_id, dag_id, fileloc, fileloc_hash, source_code, last_updated, created_at) - VALUES (:dag_version_id, :dag_id, :fileloc, :fileloc_hash, :source_code, :last_updated, :created_at) - """) - for row in rows: - try: - source_code = DagCode.get_code_from_file(row.fileloc) - except FileNotFoundError: - source_code = "source_code" - conn.execute( - stmt.bindparams( - dag_version_id=row.dag_version_id, - dag_id=row.dag_id, - fileloc=row.fileloc, - fileloc_hash=row.fileloc_hash, - source_code=source_code, - last_updated=timezone.utcnow(), - created_at=timezone.utcnow(), - ) - ) - - stmt = "SELECT dag_id, fileloc FROM dag_code" - rows = _get_rows(stmt, conn) - stmt = sa.text(""" - UPDATE dag_code - SET id = :_id, - dag_id = :dag_id, - source_code = :source_code, - source_code_hash = :source_code_hash - WHERE dag_id = :dag_id AND id IS NULL - """) - for row in rows: - id = uuid7() - if conn.dialect.name != "postgresql": - id = id.hex - else: - id = str(id) - try: - source_code = DagCode.get_code_from_file(row.fileloc) - except FileNotFoundError: - source_code = "source_code" - conn.execute( - stmt.bindparams( - _id=id, - source_code_hash=DagCode.dag_source_hash(source_code), - source_code=source_code, - dag_id=row.dag_id, - ) - ) - with op.batch_alter_table("dag_code") as batch_op: - batch_op.alter_column("dag_id", existing_type=StringID(), nullable=False) - batch_op.alter_column("id", existing_type=UUIDType(binary=False), nullable=False) + batch_op.drop_constraint("dag_code_pkey", type_="primary") + batch_op.drop_column("fileloc_hash") + batch_op.add_column(sa.Column("id", UUIDType(binary=False), nullable=False)) batch_op.create_primary_key("dag_code_pkey", ["id"]) - batch_op.alter_column("dag_version_id", existing_type=UUIDType(binary=False), nullable=False) + batch_op.add_column(sa.Column("dag_version_id", UUIDType(binary=False), nullable=False)) + batch_op.add_column(sa.Column("source_code_hash", sa.String(length=32), nullable=False)) + batch_op.add_column(sa.Column("dag_id", StringID(), nullable=False)) + batch_op.add_column(sa.Column("created_at", TIMESTAMP(), default=timezone.utcnow, nullable=False)) Review Comment: Why can we change the `alter_column` calls to `add_column`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org