This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 01603f2ac90 fix: prevent SQL keyword parameterization in MySQL 
bulk_load_custom (#63530)
01603f2ac90 is described below

commit 01603f2ac904ed9d3d7c39e50d6d549e690d66e5
Author: lif <[email protected]>
AuthorDate: Tue Apr 7 05:13:27 2026 +0800

    fix: prevent SQL keyword parameterization in MySQL bulk_load_custom (#63530)
    
    * fix: prevent SQL keyword parameterization in MySQL bulk_load_custom
    
    The bulk_load_custom method was incorrectly passing duplicate_key_handling
    and extra_options as parameterized values to cursor.execute(). MySQL drivers
    treat parameterized values as data and quote them as string literals, 
producing
    invalid SQL like:
    
      LOAD DATA LOCAL INFILE '/tmp/file' 'IGNORE' INTO TABLE `table` 'FIELDS...'
    
    This was introduced in PR #33328 which changed from string concatenation to
    parameterization. However, duplicate_key_handling (IGNORE/REPLACE) and
    extra_options are SQL syntax keywords, not data values.
    
    Fixed by interpolating these keywords directly into the SQL statement via
    f-string while keeping tmp_file as the sole parameterized value:
    
      LOAD DATA LOCAL INFILE %s IGNORE INTO TABLE `table` FIELDS...
    
    Updated existing tests and added parametrized test to verify both IGNORE
    and REPLACE keywords appear literally in SQL without being parameterized.
    
    Closes: #62506
    
    Signed-off-by: majiayu000 <[email protected]>
    
    * fix: address review feedback on test formatting and spacing
    
    Signed-off-by: majiayu000 <[email protected]>
    
    * fix: clean up spacing in bulk_load_custom test assertion
    
    Signed-off-by: majiayu000 <[email protected]>
    
    * fix: apply ruff format to mysql.py for CI compliance
    
    Signed-off-by: majiayu000 <[email protected]>
    
    ---------
    
    Signed-off-by: majiayu000 <[email protected]>
    Co-authored-by: Jarek Potiuk <[email protected]>
---
 .../src/airflow/providers/mysql/hooks/mysql.py     |  6 ++--
 .../mysql/tests/unit/mysql/hooks/test_mysql.py     | 42 ++++++++++++++++------
 2 files changed, 36 insertions(+), 12 deletions(-)

diff --git a/providers/mysql/src/airflow/providers/mysql/hooks/mysql.py 
b/providers/mysql/src/airflow/providers/mysql/hooks/mysql.py
index 40d59732530..718774f6712 100644
--- a/providers/mysql/src/airflow/providers/mysql/hooks/mysql.py
+++ b/providers/mysql/src/airflow/providers/mysql/hooks/mysql.py
@@ -342,8 +342,10 @@ class MySqlHook(DbApiHook):
         conn = self.get_conn()
         cursor = conn.cursor()
 
-        sql_statement = f"LOAD DATA LOCAL INFILE %s %s INTO TABLE `{table}` %s"
-        parameters = (tmp_file, duplicate_key_handling, extra_options)
+        sql_statement = (
+            f"LOAD DATA LOCAL INFILE %s {duplicate_key_handling} INTO TABLE 
`{table}` {extra_options}"
+        )
+        parameters = (tmp_file,)
         cursor.execute(
             sql_statement,
             parameters,
diff --git a/providers/mysql/tests/unit/mysql/hooks/test_mysql.py 
b/providers/mysql/tests/unit/mysql/hooks/test_mysql.py
index a0eba90e683..66281e4547b 100644
--- a/providers/mysql/tests/unit/mysql/hooks/test_mysql.py
+++ b/providers/mysql/tests/unit/mysql/hooks/test_mysql.py
@@ -506,14 +506,11 @@ class TestMySqlHook:
             IGNORE 1 LINES""",
         )
         self.cur.execute.assert_called_once_with(
-            f"LOAD DATA LOCAL INFILE %s %s INTO TABLE `{table}` %s",
-            (
-                "/tmp/file",
-                "IGNORE",
-                """FIELDS TERMINATED BY ';'
-            OPTIONALLY ENCLOSED BY '"'
-            IGNORE 1 LINES""",
-            ),
+            f"LOAD DATA LOCAL INFILE %s IGNORE INTO TABLE `{table}` "
+            "FIELDS TERMINATED BY ';'\n"
+            "            OPTIONALLY ENCLOSED BY '\"'\n"
+            "            IGNORE 1 LINES",
+            ("/tmp/file",),
         )
 
     @mock.patch("airflow.providers.mysql.hooks.mysql.send_sql_hook_lineage")
@@ -527,14 +524,39 @@ class TestMySqlHook:
         mock_send_lineage.assert_called_once()
         call_kw = mock_send_lineage.call_args.kwargs
         assert call_kw["context"] is self.db_hook
-        assert call_kw["sql"] == "LOAD DATA LOCAL INFILE %s %s INTO TABLE 
`table` %s"
-        assert call_kw["sql_parameters"] == ("/tmp/file", "IGNORE", "FIELDS 
TERMINATED BY ';'")
+        assert (
+            call_kw["sql"] == "LOAD DATA LOCAL INFILE %s IGNORE INTO TABLE 
`table` FIELDS TERMINATED BY ';'"
+        )
+        assert call_kw["sql_parameters"] == ("/tmp/file",)
         assert call_kw["cur"] is self.cur
 
     def test_reserved_words(self):
         hook = MySqlHook()
         assert hook.reserved_words == 
sqlalchemy.dialects.mysql.reserved_words.RESERVED_WORDS_MYSQL
 
+    @pytest.mark.parametrize(
+        ("duplicate_key_handling", "extra_options"),
+        [
+            ("IGNORE", "FIELDS TERMINATED BY ','"),
+            ("REPLACE", "FIELDS TERMINATED BY '\\t'"),
+        ],
+    )
+    def test_bulk_load_custom_duplicate_key_not_parameterized(self, 
duplicate_key_handling, extra_options):
+        """Test that duplicate_key_handling and extra_options appear as SQL 
keywords, not parameterized values."""
+        self.db_hook.bulk_load_custom(
+            "test_table",
+            "/tmp/test_file",
+            duplicate_key_handling,
+            extra_options,
+        )
+        call_args = self.cur.execute.call_args
+        sql_statement = call_args[0][0]
+        parameters = call_args[0][1]
+
+        assert duplicate_key_handling in sql_statement
+        assert extra_options in sql_statement
+        assert parameters == ("/tmp/test_file",)
+
     def test_generate_insert_sql_without_already_escaped_column_name(self):
         values = [
             "1",

Reply via email to