This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 45dd0c484e Move local_infile option from extra to hook parameter 
(#28811)
45dd0c484e is described below

commit 45dd0c484e16ff56800cc9c047f56b4a909d2d0d
Author: Jarek Potiuk <[email protected]>
AuthorDate: Wed Jan 11 09:47:43 2023 +0100

    Move local_infile option from extra to hook parameter (#28811)
    
    This change is to move local_infile parameter from connection
    extra to Hook. Since this feature is only used for very specific
    cases, it belongs to the "action" it executes not to the connection
    defined in general. For example in Hive and Vertica transfers, the
    capability of local_inline is simply exnabled by bulk_load
    parameter - and it allows to use the same connection in both cases.
---
 airflow/providers/apache/hive/transfers/hive_to_mysql.py | 12 ++++++------
 airflow/providers/mysql/CHANGELOG.rst                    |  9 +++++++++
 airflow/providers/mysql/hooks/mysql.py                   | 10 +++++++---
 airflow/providers/mysql/provider.yaml                    |  1 +
 airflow/providers/mysql/transfers/vertica_to_mysql.py    |  7 +++----
 .../apache-airflow-providers-mysql/connections/mysql.rst | 16 +++-------------
 .../apache/hive/transfers/test_hive_to_mysql.py          |  5 ++++-
 tests/providers/mysql/hooks/test_mysql.py                |  8 ++++----
 8 files changed, 37 insertions(+), 31 deletions(-)

diff --git a/airflow/providers/apache/hive/transfers/hive_to_mysql.py 
b/airflow/providers/apache/hive/transfers/hive_to_mysql.py
index b1a3669d71..6d048d7b73 100644
--- a/airflow/providers/apache/hive/transfers/hive_to_mysql.py
+++ b/airflow/providers/apache/hive/transfers/hive_to_mysql.py
@@ -50,9 +50,9 @@ class HiveToMySqlOperator(BaseOperator):
         import, typically used to move data from staging to
         production and issue cleanup commands. (templated)
     :param bulk_load: flag to use bulk_load option.  This loads mysql directly
-        from a tab-delimited text file using the LOAD DATA LOCAL INFILE 
command.
-        This option requires an extra connection parameter for the
-        destination MySQL connection: {'local_infile': true}.
+        from a tab-delimited text file using the LOAD DATA LOCAL INFILE 
command. The MySQL
+        server must support loading local files via this command (it is 
disabled by default).
+
     :param hive_conf:
     """
 
@@ -105,7 +105,7 @@ class HiveToMySqlOperator(BaseOperator):
                     output_header=False,
                     hive_conf=hive_conf,
                 )
-                mysql = self._call_preoperator()
+                mysql = self._call_preoperator(local_infile=self.bulk_load)
                 mysql.bulk_load(table=self.mysql_table, tmp_file=tmp_file.name)
         else:
             hive_results = hive.get_records(self.sql, parameters=hive_conf)
@@ -118,8 +118,8 @@ class HiveToMySqlOperator(BaseOperator):
 
         self.log.info("Done.")
 
-    def _call_preoperator(self):
-        mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
+    def _call_preoperator(self, local_infile: bool = False) -> MySqlHook:
+        mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id, 
local_infile=local_infile)
         if self.mysql_preoperator:
             self.log.info("Running MySQL preoperator")
             mysql.run(self.mysql_preoperator)
diff --git a/airflow/providers/mysql/CHANGELOG.rst 
b/airflow/providers/mysql/CHANGELOG.rst
index 1ba558bb53..66e4742628 100644
--- a/airflow/providers/mysql/CHANGELOG.rst
+++ b/airflow/providers/mysql/CHANGELOG.rst
@@ -27,6 +27,15 @@ used with MySQL server 5.6.4 through 5.7.
 Changelog
 ---------
 
+4.0.0
+.....
+
+Breaking Changes
+~~~~~~~~~~~~~~~~
+
+You can no longer pass "local_infile" as extra in the connection. You should 
pass it instead as
+hook's "local_infile" parameter when you create the MySqlHook (either directly 
or via hook_params).
+
 3.4.0
 .....
 
diff --git a/airflow/providers/mysql/hooks/mysql.py 
b/airflow/providers/mysql/hooks/mysql.py
index 843ac0b8aa..83a25f1aed 100644
--- a/airflow/providers/mysql/hooks/mysql.py
+++ b/airflow/providers/mysql/hooks/mysql.py
@@ -45,8 +45,12 @@ class MySqlHook(DbApiHook):
     in extras.
     extras example: ``{"iam":true, "aws_conn_id":"my_aws_conn"}``
 
+    You can also add "local_infile" parameter to determine whether 
local_infile feature of MySQL client is
+    going to be enabled (it is disabled by default).
+
     :param schema: The MySQL database schema to connect to.
     :param connection: The :ref:`MySQL connection id <howto/connection:mysql>` 
used for MySQL credentials.
+    :param local_infile: Boolean flag determining if local_infile should be 
used
     """
 
     conn_name_attr = "mysql_conn_id"
@@ -59,6 +63,7 @@ class MySqlHook(DbApiHook):
         super().__init__(*args, **kwargs)
         self.schema = kwargs.pop("schema", None)
         self.connection = kwargs.pop("connection", None)
+        self.local_infile = kwargs.pop("local_infile", False)
 
     def set_autocommit(self, conn: MySQLConnectionTypes, autocommit: bool) -> 
None:
         """
@@ -118,7 +123,6 @@ class MySqlHook(DbApiHook):
                 conn_config["cursorclass"] = MySQLdb.cursors.DictCursor
             elif (conn.extra_dejson["cursor"]).lower() == "ssdictcursor":
                 conn_config["cursorclass"] = MySQLdb.cursors.SSDictCursor
-        local_infile = conn.extra_dejson.get("local_infile", False)
         if conn.extra_dejson.get("ssl", False):
             # SSL parameter for MySQL has to be a dictionary and in case
             # of extra/dejson we can get string if extra is passed via
@@ -131,7 +135,7 @@ class MySqlHook(DbApiHook):
             conn_config["ssl_mode"] = conn.extra_dejson["ssl_mode"]
         if conn.extra_dejson.get("unix_socket"):
             conn_config["unix_socket"] = conn.extra_dejson["unix_socket"]
-        if local_infile:
+        if self.local_infile:
             conn_config["local_infile"] = 1
         return conn_config
 
@@ -144,7 +148,7 @@ class MySqlHook(DbApiHook):
             "port": int(conn.port) if conn.port else 3306,
         }
 
-        if conn.extra_dejson.get("allow_local_infile", False):
+        if self.local_infile:
             conn_config["allow_local_infile"] = True
         # Ref: 
https://dev.mysql.com/doc/connector-python/en/connector-python-connectargs.html
         for key, value in conn.extra_dejson.items():
diff --git a/airflow/providers/mysql/provider.yaml 
b/airflow/providers/mysql/provider.yaml
index 28d3fea687..d85abecf33 100644
--- a/airflow/providers/mysql/provider.yaml
+++ b/airflow/providers/mysql/provider.yaml
@@ -22,6 +22,7 @@ description: |
     `MySQL <https://www.mysql.com/products/>`__
 
 versions:
+  - 4.0.0
   - 3.4.0
   - 3.3.0
   - 3.2.1
diff --git a/airflow/providers/mysql/transfers/vertica_to_mysql.py 
b/airflow/providers/mysql/transfers/vertica_to_mysql.py
index a7df1f029d..5f2274c6b4 100644
--- a/airflow/providers/mysql/transfers/vertica_to_mysql.py
+++ b/airflow/providers/mysql/transfers/vertica_to_mysql.py
@@ -49,9 +49,8 @@ class VerticaToMySqlOperator(BaseOperator):
         import, typically used to move data from staging to production
         and issue cleanup commands. (templated)
     :param bulk_load: flag to use bulk_load option.  This loads MySQL directly
-        from a tab-delimited text file using the LOAD DATA LOCAL INFILE 
command.
-        This option requires an extra connection parameter for the
-        destination MySQL connection: {'local_infile': true}.
+        from a tab-delimited text file using the LOAD DATA LOCAL INFILE 
command. The MySQL
+        server must support loading local files via this command (it is 
disabled by default).
     """
 
     template_fields: Sequence[str] = ("sql", "mysql_table", 
"mysql_preoperator", "mysql_postoperator")
@@ -86,7 +85,7 @@ class VerticaToMySqlOperator(BaseOperator):
 
     def execute(self, context: Context):
         vertica = VerticaHook(vertica_conn_id=self.vertica_conn_id)
-        mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
+        mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id, 
local_infile=self.bulk_load)
 
         if self.bulk_load:
             self._bulk_load_transfer(mysql, vertica)
diff --git a/docs/apache-airflow-providers-mysql/connections/mysql.rst 
b/docs/apache-airflow-providers-mysql/connections/mysql.rst
index 95d8e7aaba..e8b8091b83 100644
--- a/docs/apache-airflow-providers-mysql/connections/mysql.rst
+++ b/docs/apache-airflow-providers-mysql/connections/mysql.rst
@@ -46,9 +46,6 @@ Extra (optional)
       * ``charset``: specify charset of the connection
       * ``cursor``: one of ``sscursor``, ``dictcursor``, ``ssdictcursor`` . 
Specifies cursor class to be
         used
-      * ``local_infile``: controls MySQL's LOCAL capability (permitting local 
data loading by
-        clients). See `MySQLdb docs 
<https://mysqlclient.readthedocs.io/user_guide.html>`_
-        for details.
       * ``unix_socket``: UNIX socket used instead of the default socket.
       * ``ssl``: Dictionary of SSL parameters that control connecting using 
SSL. Those
         parameters are server specific and should contain ``ca``, ``cert``, 
``key``, ``capath``,
@@ -99,14 +96,7 @@ Extra (optional)
           If encounter UnicodeDecodeError while working with MySQL connection, 
check
           the charset defined is matched to the database charset.
 
-    For ``mysql-connector-python`` the following extras are supported:
+    For ``mysql-connector-python`` no extras are supported:
 
-      * ``allow_local_infile``: Whether to enable ``LOAD DATA LOCAL INFILE`` 
capability.
-
-      Example "extras" field:
-
-      .. code-block:: json
-
-         {
-            "allow_local_infile": true
-         }
+In both cases, when you want to use ``LOAD DATA LOCAL INFILE`` SQL commands of 
MySQl, you need to create the
+Hook with "local_infile" parameter set to True.
diff --git a/tests/providers/apache/hive/transfers/test_hive_to_mysql.py 
b/tests/providers/apache/hive/transfers/test_hive_to_mysql.py
index d8f39812b2..5fefc2594e 100644
--- a/tests/providers/apache/hive/transfers/test_hive_to_mysql.py
+++ b/tests/providers/apache/hive/transfers/test_hive_to_mysql.py
@@ -49,7 +49,9 @@ class TestHiveToMySqlTransfer(TestHiveEnvironment):
 
         
mock_hive_hook.assert_called_once_with(hiveserver2_conn_id=self.kwargs["hiveserver2_conn_id"])
         mock_hive_hook.return_value.get_records.assert_called_once_with("sql", 
parameters={})
-        
mock_mysql_hook.assert_called_once_with(mysql_conn_id=self.kwargs["mysql_conn_id"])
+        mock_mysql_hook.assert_called_once_with(
+            mysql_conn_id=self.kwargs["mysql_conn_id"], local_infile=False
+        )
         mock_mysql_hook.return_value.insert_rows.assert_called_once_with(
             table=self.kwargs["mysql_table"], 
rows=mock_hive_hook.return_value.get_records.return_value
         )
@@ -84,6 +86,7 @@ class TestHiveToMySqlTransfer(TestHiveEnvironment):
 
         HiveToMySqlOperator(**self.kwargs).execute(context=context)
 
+        
mock_mysql_hook.assert_called_once_with(mysql_conn_id=self.kwargs["mysql_conn_id"],
 local_infile=True)
         mock_tmp_file_context.assert_called_once_with()
         mock_hive_hook.return_value.to_csv.assert_called_once_with(
             self.kwargs["sql"],
diff --git a/tests/providers/mysql/hooks/test_mysql.py 
b/tests/providers/mysql/hooks/test_mysql.py
index eefc34c51b..1ce211e075 100644
--- a/tests/providers/mysql/hooks/test_mysql.py
+++ b/tests/providers/mysql/hooks/test_mysql.py
@@ -123,7 +123,7 @@ class TestMySqlHookConn:
 
     @mock.patch("MySQLdb.connect")
     def test_get_conn_local_infile(self, mock_connect):
-        self.connection.extra = json.dumps({"local_infile": True})
+        self.db_hook.local_infile = True
         self.db_hook.get_conn()
         assert mock_connect.call_count == 1
         args, kwargs = mock_connect.call_args
@@ -219,8 +219,8 @@ class TestMySqlHookConnMySqlConnectorPython:
     @mock.patch("mysql.connector.connect")
     def test_get_conn_allow_local_infile(self, mock_connect):
         extra_dict = self.connection.extra_dejson
-        extra_dict.update(allow_local_infile=True)
         self.connection.extra = json.dumps(extra_dict)
+        self.db_hook.local_infile = True
         self.db_hook.get_conn()
         assert mock_connect.call_count == 1
         args, kwargs = mock_connect.call_args
@@ -406,7 +406,7 @@ class TestMySql:
     @mock.patch.dict(
         "os.environ",
         {
-            "AIRFLOW_CONN_AIRFLOW_DB": 
"mysql://root@mysql/airflow?charset=utf8mb4&local_infile=1",
+            "AIRFLOW_CONN_AIRFLOW_DB": 
"mysql://root@mysql/airflow?charset=utf8mb4",
         },
     )
     def test_mysql_hook_test_bulk_load(self, client):
@@ -419,7 +419,7 @@ class TestMySql:
                 f.write("\n".join(records).encode("utf8"))
                 f.flush()
 
-                hook = MySqlHook("airflow_db")
+                hook = MySqlHook("airflow_db", local_infile=True)
                 with closing(hook.get_conn()) as conn:
                     with closing(conn.cursor()) as cursor:
                         cursor.execute(

Reply via email to