xchwan commented on code in PR #55500:
URL: https://github.com/apache/airflow/pull/55500#discussion_r2447417318
##########
airflow-core/src/airflow/migrations/versions/0017_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py:
##########
@@ -44,35 +44,24 @@ def upgrade():
conn = op.get_bind()
if conn.dialect.name == "mysql":
# TODO: Rewrite these queries to use alembic when lowest MYSQL version
supports IF EXISTS
- conn.execute(
+ # Remove prepared statements for PyMySQL
+ result = conn.execute(
sa.text("""
- set @var=if((SELECT true FROM information_schema.TABLE_CONSTRAINTS
WHERE
- CONSTRAINT_SCHEMA = DATABASE() AND
- TABLE_NAME = 'connection' AND
- CONSTRAINT_NAME = 'unique_conn_id' AND
- CONSTRAINT_TYPE = 'UNIQUE') = true,'ALTER TABLE connection
- DROP INDEX unique_conn_id','select 1');
-
- prepare stmt from @var;
- execute stmt;
- deallocate prepare stmt;
- """)
+ SELECT CONSTRAINT_NAME
+ FROM information_schema.TABLE_CONSTRAINTS
+ WHERE CONSTRAINT_SCHEMA = DATABASE()
+ AND TABLE_NAME = 'connection'
+ AND CONSTRAINT_TYPE = 'UNIQUE';
+ """)
)
- # Dropping the below and recreating cause there's no IF NOT EXISTS in
mysql
- conn.execute(
- sa.text("""
- set @var=if((SELECT true FROM
information_schema.TABLE_CONSTRAINTS WHERE
- CONSTRAINT_SCHEMA = DATABASE() AND
- TABLE_NAME = 'connection' AND
- CONSTRAINT_NAME = 'connection_conn_id_uq' AND
- CONSTRAINT_TYPE = 'UNIQUE') = true,'ALTER TABLE
connection
- DROP INDEX connection_conn_id_uq','select 1');
- prepare stmt from @var;
- execute stmt;
- deallocate prepare stmt;
- """)
- )
+ rows = result.all() if result is not None else []
+ existing_indexes = {row[0] for row in rows}
+ drop_indexes = ["unique_conn_id", "connection_conn_id_uq"]
+ for idx in drop_indexes:
+ if idx in existing_indexes:
+ conn.execute(sa.text(f"ALTER TABLE connection DROP INDEX
{idx}"))
Review Comment:
It work. I run ```breeze shell``` and do ```airflow db migrate -s
--from-version "2.9.0" -n "2.10.0"``` it generate.
```
root@b2a719b87dad:/opt/airflow# airflow db migrate -s --from-version "2.9.0"
-n "2.10.0"
DB: mysql://root@mysql/airflow?charset=utf8mb4
Generating sql for upgrade -- upgrade commands will *not* be submitted.
-- Running upgrade 1949afb29106 -> bff083ad727d
DROP INDEX idx_last_scheduling_decision ON dag_run;
UPDATE alembic_version SET version_num='bff083ad727d' WHERE
alembic_version.version_num = '1949afb29106';
-- Running upgrade bff083ad727d -> 686269002441
SELECT CONSTRAINT_NAME
FROM information_schema.TABLE_CONSTRAINTS
WHERE CONSTRAINT_SCHEMA = DATABASE()
AND TABLE_NAME = 'connection'
AND CONSTRAINT_TYPE = 'UNIQUE';;
ALTER TABLE connection ADD CONSTRAINT connection_conn_id_uq UNIQUE (conn_id);
UPDATE dag SET max_consecutive_failed_dag_runs='0';
ALTER TABLE dag MODIFY max_consecutive_failed_dag_runs INTEGER NOT NULL;
ALTER TABLE task_instance DROP FOREIGN KEY task_instance_dag_run_fkey;
ALTER TABLE task_reschedule DROP FOREIGN KEY task_reschedule_dr_fkey;
SELECT CONSTRAINT_NAME
FROM information_schema.TABLE_CONSTRAINTS
WHERE CONSTRAINT_SCHEMA = DATABASE()
AND TABLE_NAME = 'dag_run'
AND CONSTRAINT_TYPE = 'UNIQUE';;
ALTER TABLE callback_request MODIFY processor_subdir VARCHAR(2000) NULL;
ALTER TABLE dag MODIFY processor_subdir VARCHAR(2000) NULL;
ALTER TABLE import_error MODIFY processor_subdir VARCHAR(2000) NULL;
ALTER TABLE serialized_dag MODIFY processor_subdir VARCHAR(2000) NULL;
ALTER TABLE dag_run ADD CONSTRAINT dag_run_dag_id_execution_date_key UNIQUE
(dag_id, execution_date);
ALTER TABLE dag_run ADD CONSTRAINT dag_run_dag_id_run_id_key UNIQUE (dag_id,
run_id);
ALTER TABLE task_instance ADD CONSTRAINT task_instance_dag_run_fkey FOREIGN
KEY(dag_id, run_id) REFERENCES dag_run (dag_id, run_id) ON DELETE CASCADE;
ALTER TABLE task_reschedule ADD CONSTRAINT task_reschedule_dr_fkey FOREIGN
KEY(dag_id, run_id) REFERENCES dag_run (dag_id, run_id) ON DELETE CASCADE;
UPDATE alembic_version SET version_num='686269002441' WHERE
alembic_version.version_num = 'bff083ad727d';
-- Running upgrade 686269002441 -> 0fd0c178cbe8
CREATE INDEX idx_dag_schedule_dataset_reference_dag_id ON
dag_schedule_dataset_reference (dag_id);
CREATE INDEX idx_dag_tag_dag_id ON dag_tag (dag_id);
CREATE INDEX idx_dag_warning_dag_id ON dag_warning (dag_id);
CREATE INDEX idx_dataset_dag_run_queue_target_dag_id ON
dataset_dag_run_queue (target_dag_id);
CREATE INDEX idx_task_outlet_dataset_reference_dag_id ON
task_outlet_dataset_reference (dag_id);
UPDATE alembic_version SET version_num='0fd0c178cbe8' WHERE
alembic_version.version_num = '686269002441';
-- Running upgrade 0fd0c178cbe8 -> 677fdbb7fc54
ALTER TABLE task_instance ADD COLUMN executor VARCHAR(1000);
UPDATE alembic_version SET version_num='677fdbb7fc54' WHERE
alembic_version.version_num = '0fd0c178cbe8';
-- Running upgrade 677fdbb7fc54 -> c4602ba06b4b
CREATE TABLE dag_priority_parsing_request (
id VARCHAR(32) NOT NULL,
fileloc VARCHAR(2000) NOT NULL,
CONSTRAINT dag_priority_parsing_request_pkey PRIMARY KEY (id)
);
UPDATE alembic_version SET version_num='c4602ba06b4b' WHERE
alembic_version.version_num = '677fdbb7fc54';
-- Running upgrade c4602ba06b4b -> d482b7261ff9
CREATE TABLE task_instance_history (
id INTEGER NOT NULL AUTO_INCREMENT,
task_id VARCHAR(250) COLLATE utf8mb3_bin NOT NULL,
dag_id VARCHAR(250) COLLATE utf8mb3_bin NOT NULL,
run_id VARCHAR(250) COLLATE utf8mb3_bin NOT NULL,
map_index INTEGER NOT NULL DEFAULT -1,
try_number INTEGER NOT NULL,
start_date TIMESTAMP(6) NULL,
end_date TIMESTAMP(6) NULL,
duration FLOAT,
state VARCHAR(20),
max_tries INTEGER DEFAULT -1,
hostname VARCHAR(1000),
unixname VARCHAR(1000),
job_id INTEGER,
pool VARCHAR(256) NOT NULL,
pool_slots INTEGER NOT NULL,
queue VARCHAR(256),
priority_weight INTEGER,
operator VARCHAR(1000),
custom_operator_name VARCHAR(1000),
queued_dttm TIMESTAMP(6) NULL,
queued_by_job_id INTEGER,
pid INTEGER,
executor VARCHAR(1000),
executor_config BLOB,
updated_at TIMESTAMP(6) NULL,
rendered_map_index VARCHAR(250),
external_executor_id VARCHAR(250),
trigger_id INTEGER,
trigger_timeout DATETIME,
next_method VARCHAR(1000),
next_kwargs JSON,
task_display_name VARCHAR(2000),
CONSTRAINT task_instance_history_pkey PRIMARY KEY (id),
CONSTRAINT task_instance_history_ti_fkey FOREIGN KEY(dag_id, task_id,
run_id, map_index) REFERENCES task_instance (dag_id, task_id, run_id,
map_index) ON DELETE CASCADE ON UPDATE CASCADE,
CONSTRAINT task_instance_history_dtrt_uq UNIQUE (dag_id, task_id,
run_id, map_index, try_number)
);
UPDATE alembic_version SET version_num='d482b7261ff9' WHERE
alembic_version.version_num = 'c4602ba06b4b';
-- Running upgrade d482b7261ff9 -> 05e19f3176be
CREATE TABLE dataset_alias (
id INTEGER NOT NULL AUTO_INCREMENT,
name VARCHAR(3000) COLLATE latin1_general_cs NOT NULL,
CONSTRAINT dataset_alias_pkey PRIMARY KEY (id)
);
CREATE UNIQUE INDEX idx_name_unique ON dataset_alias (name);
UPDATE alembic_version SET version_num='05e19f3176be' WHERE
alembic_version.version_num = 'd482b7261ff9';
-- Running upgrade 05e19f3176be -> ec3471c1e067
CREATE TABLE dataset_alias_dataset_event (
alias_id INTEGER NOT NULL,
event_id INTEGER NOT NULL,
CONSTRAINT dataset_alias_dataset_event_pkey PRIMARY KEY (alias_id,
event_id),
CONSTRAINT dataset_alias_dataset_event_alias_id_fkey FOREIGN
KEY(alias_id) REFERENCES dataset_alias (id) ON DELETE CASCADE,
CONSTRAINT dataset_alias_dataset_event_event_id_fkey FOREIGN
KEY(event_id) REFERENCES dataset_event (id) ON DELETE CASCADE
);
CREATE INDEX idx_dataset_alias_dataset_event_alias_id ON
dataset_alias_dataset_event (alias_id);
CREATE INDEX idx_dataset_alias_dataset_event_event_id ON
dataset_alias_dataset_event (event_id);
UPDATE alembic_version SET version_num='ec3471c1e067' WHERE
alembic_version.version_num = '05e19f3176be';
-- Running upgrade ec3471c1e067 -> 41b3bc7c0272
ALTER TABLE log ADD COLUMN try_number INTEGER;
CREATE INDEX idx_log_task_instance ON log (dag_id, task_id, run_id,
map_index, try_number);
UPDATE alembic_version SET version_num='41b3bc7c0272' WHERE
alembic_version.version_num = 'ec3471c1e067';
-- Running upgrade 41b3bc7c0272 -> 8684e37832e6
CREATE TABLE dataset_alias_dataset (
alias_id INTEGER NOT NULL,
dataset_id INTEGER NOT NULL,
CONSTRAINT dataset_alias_dataset_pkey PRIMARY KEY (alias_id,
dataset_id),
CONSTRAINT dataset_alias_dataset_alias_id_fkey FOREIGN KEY(alias_id)
REFERENCES dataset_alias (id) ON DELETE CASCADE,
CONSTRAINT dataset_alias_dataset_dataset_id_fkey FOREIGN KEY(dataset_id)
REFERENCES dataset (id) ON DELETE CASCADE
);
CREATE INDEX idx_dataset_alias_dataset_alias_dataset_id ON
dataset_alias_dataset (dataset_id);
CREATE INDEX idx_dataset_alias_dataset_alias_id ON dataset_alias_dataset
(alias_id);
UPDATE alembic_version SET version_num='8684e37832e6' WHERE
alembic_version.version_num = '41b3bc7c0272';
-- Running upgrade 8684e37832e6 -> 22ed7efa9da2
CREATE TABLE dag_schedule_dataset_alias_reference (
alias_id INTEGER NOT NULL,
dag_id VARCHAR(250) COLLATE utf8mb3_bin NOT NULL,
created_at TIMESTAMP(6) NOT NULL,
updated_at TIMESTAMP(6) NOT NULL,
CONSTRAINT dsdar_pkey PRIMARY KEY (alias_id, dag_id),
CONSTRAINT dsdar_dataset_fkey FOREIGN KEY(alias_id) REFERENCES
dataset_alias (id) ON DELETE CASCADE,
CONSTRAINT dsdar_dag_fkey FOREIGN KEY(dag_id) REFERENCES dag (dag_id) ON
DELETE CASCADE
);
CREATE INDEX idx_dag_schedule_dataset_alias_reference_dag_id ON
dag_schedule_dataset_alias_reference (dag_id);
UPDATE alembic_version SET version_num='22ed7efa9da2' WHERE
alembic_version.version_num = '8684e37832e6';
```
The for loop will generate ALTER TABLE sql to modify table
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]