This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 01603f2ac90 fix: prevent SQL keyword parameterization in MySQL
bulk_load_custom (#63530)
01603f2ac90 is described below
commit 01603f2ac904ed9d3d7c39e50d6d549e690d66e5
Author: lif <[email protected]>
AuthorDate: Tue Apr 7 05:13:27 2026 +0800
fix: prevent SQL keyword parameterization in MySQL bulk_load_custom (#63530)
* fix: prevent SQL keyword parameterization in MySQL bulk_load_custom
The bulk_load_custom method was incorrectly passing duplicate_key_handling
and extra_options as parameterized values to cursor.execute(). MySQL drivers
treat parameterized values as data and quote them as string literals,
producing
invalid SQL like:
LOAD DATA LOCAL INFILE '/tmp/file' 'IGNORE' INTO TABLE `table` 'FIELDS...'
This was introduced in PR #33328 which changed from string concatenation to
parameterization. However, duplicate_key_handling (IGNORE/REPLACE) and
extra_options are SQL syntax keywords, not data values.
Fixed by interpolating these keywords directly into the SQL statement via
f-string while keeping tmp_file as the sole parameterized value:
LOAD DATA LOCAL INFILE %s IGNORE INTO TABLE `table` FIELDS...
Updated existing tests and added parametrized test to verify both IGNORE
and REPLACE keywords appear literally in SQL without being parameterized.
Closes: #62506
Signed-off-by: majiayu000 <[email protected]>
* fix: address review feedback on test formatting and spacing
Signed-off-by: majiayu000 <[email protected]>
* fix: clean up spacing in bulk_load_custom test assertion
Signed-off-by: majiayu000 <[email protected]>
* fix: apply ruff format to mysql.py for CI compliance
Signed-off-by: majiayu000 <[email protected]>
---------
Signed-off-by: majiayu000 <[email protected]>
Co-authored-by: Jarek Potiuk <[email protected]>
---
.../src/airflow/providers/mysql/hooks/mysql.py | 6 ++--
.../mysql/tests/unit/mysql/hooks/test_mysql.py | 42 ++++++++++++++++------
2 files changed, 36 insertions(+), 12 deletions(-)
diff --git a/providers/mysql/src/airflow/providers/mysql/hooks/mysql.py
b/providers/mysql/src/airflow/providers/mysql/hooks/mysql.py
index 40d59732530..718774f6712 100644
--- a/providers/mysql/src/airflow/providers/mysql/hooks/mysql.py
+++ b/providers/mysql/src/airflow/providers/mysql/hooks/mysql.py
@@ -342,8 +342,10 @@ class MySqlHook(DbApiHook):
conn = self.get_conn()
cursor = conn.cursor()
- sql_statement = f"LOAD DATA LOCAL INFILE %s %s INTO TABLE `{table}` %s"
- parameters = (tmp_file, duplicate_key_handling, extra_options)
+ sql_statement = (
+ f"LOAD DATA LOCAL INFILE %s {duplicate_key_handling} INTO TABLE
`{table}` {extra_options}"
+ )
+ parameters = (tmp_file,)
cursor.execute(
sql_statement,
parameters,
diff --git a/providers/mysql/tests/unit/mysql/hooks/test_mysql.py
b/providers/mysql/tests/unit/mysql/hooks/test_mysql.py
index a0eba90e683..66281e4547b 100644
--- a/providers/mysql/tests/unit/mysql/hooks/test_mysql.py
+++ b/providers/mysql/tests/unit/mysql/hooks/test_mysql.py
@@ -506,14 +506,11 @@ class TestMySqlHook:
IGNORE 1 LINES""",
)
self.cur.execute.assert_called_once_with(
- f"LOAD DATA LOCAL INFILE %s %s INTO TABLE `{table}` %s",
- (
- "/tmp/file",
- "IGNORE",
- """FIELDS TERMINATED BY ';'
- OPTIONALLY ENCLOSED BY '"'
- IGNORE 1 LINES""",
- ),
+ f"LOAD DATA LOCAL INFILE %s IGNORE INTO TABLE `{table}` "
+ "FIELDS TERMINATED BY ';'\n"
+ " OPTIONALLY ENCLOSED BY '\"'\n"
+ " IGNORE 1 LINES",
+ ("/tmp/file",),
)
@mock.patch("airflow.providers.mysql.hooks.mysql.send_sql_hook_lineage")
@@ -527,14 +524,39 @@ class TestMySqlHook:
mock_send_lineage.assert_called_once()
call_kw = mock_send_lineage.call_args.kwargs
assert call_kw["context"] is self.db_hook
- assert call_kw["sql"] == "LOAD DATA LOCAL INFILE %s %s INTO TABLE
`table` %s"
- assert call_kw["sql_parameters"] == ("/tmp/file", "IGNORE", "FIELDS
TERMINATED BY ';'")
+ assert (
+ call_kw["sql"] == "LOAD DATA LOCAL INFILE %s IGNORE INTO TABLE
`table` FIELDS TERMINATED BY ';'"
+ )
+ assert call_kw["sql_parameters"] == ("/tmp/file",)
assert call_kw["cur"] is self.cur
def test_reserved_words(self):
hook = MySqlHook()
assert hook.reserved_words ==
sqlalchemy.dialects.mysql.reserved_words.RESERVED_WORDS_MYSQL
+ @pytest.mark.parametrize(
+ ("duplicate_key_handling", "extra_options"),
+ [
+ ("IGNORE", "FIELDS TERMINATED BY ','"),
+ ("REPLACE", "FIELDS TERMINATED BY '\\t'"),
+ ],
+ )
+ def test_bulk_load_custom_duplicate_key_not_parameterized(self,
duplicate_key_handling, extra_options):
+ """Test that duplicate_key_handling and extra_options appear as SQL
keywords, not parameterized values."""
+ self.db_hook.bulk_load_custom(
+ "test_table",
+ "/tmp/test_file",
+ duplicate_key_handling,
+ extra_options,
+ )
+ call_args = self.cur.execute.call_args
+ sql_statement = call_args[0][0]
+ parameters = call_args[0][1]
+
+ assert duplicate_key_handling in sql_statement
+ assert extra_options in sql_statement
+ assert parameters == ("/tmp/test_file",)
+
def test_generate_insert_sql_without_already_escaped_column_name(self):
values = [
"1",