This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 45dd0c484e Move local_infile option from extra to hook parameter
(#28811)
45dd0c484e is described below
commit 45dd0c484e16ff56800cc9c047f56b4a909d2d0d
Author: Jarek Potiuk <[email protected]>
AuthorDate: Wed Jan 11 09:47:43 2023 +0100
Move local_infile option from extra to hook parameter (#28811)
This change is to move local_infile parameter from connection
extra to Hook. Since this feature is only used for very specific
cases, it belongs to the "action" it executes not to the connection
defined in general. For example in Hive and Vertica transfers, the
capability of local_inline is simply exnabled by bulk_load
parameter - and it allows to use the same connection in both cases.
---
airflow/providers/apache/hive/transfers/hive_to_mysql.py | 12 ++++++------
airflow/providers/mysql/CHANGELOG.rst | 9 +++++++++
airflow/providers/mysql/hooks/mysql.py | 10 +++++++---
airflow/providers/mysql/provider.yaml | 1 +
airflow/providers/mysql/transfers/vertica_to_mysql.py | 7 +++----
.../apache-airflow-providers-mysql/connections/mysql.rst | 16 +++-------------
.../apache/hive/transfers/test_hive_to_mysql.py | 5 ++++-
tests/providers/mysql/hooks/test_mysql.py | 8 ++++----
8 files changed, 37 insertions(+), 31 deletions(-)
diff --git a/airflow/providers/apache/hive/transfers/hive_to_mysql.py
b/airflow/providers/apache/hive/transfers/hive_to_mysql.py
index b1a3669d71..6d048d7b73 100644
--- a/airflow/providers/apache/hive/transfers/hive_to_mysql.py
+++ b/airflow/providers/apache/hive/transfers/hive_to_mysql.py
@@ -50,9 +50,9 @@ class HiveToMySqlOperator(BaseOperator):
import, typically used to move data from staging to
production and issue cleanup commands. (templated)
:param bulk_load: flag to use bulk_load option. This loads mysql directly
- from a tab-delimited text file using the LOAD DATA LOCAL INFILE
command.
- This option requires an extra connection parameter for the
- destination MySQL connection: {'local_infile': true}.
+ from a tab-delimited text file using the LOAD DATA LOCAL INFILE
command. The MySQL
+ server must support loading local files via this command (it is
disabled by default).
+
:param hive_conf:
"""
@@ -105,7 +105,7 @@ class HiveToMySqlOperator(BaseOperator):
output_header=False,
hive_conf=hive_conf,
)
- mysql = self._call_preoperator()
+ mysql = self._call_preoperator(local_infile=self.bulk_load)
mysql.bulk_load(table=self.mysql_table, tmp_file=tmp_file.name)
else:
hive_results = hive.get_records(self.sql, parameters=hive_conf)
@@ -118,8 +118,8 @@ class HiveToMySqlOperator(BaseOperator):
self.log.info("Done.")
- def _call_preoperator(self):
- mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
+ def _call_preoperator(self, local_infile: bool = False) -> MySqlHook:
+ mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id,
local_infile=local_infile)
if self.mysql_preoperator:
self.log.info("Running MySQL preoperator")
mysql.run(self.mysql_preoperator)
diff --git a/airflow/providers/mysql/CHANGELOG.rst
b/airflow/providers/mysql/CHANGELOG.rst
index 1ba558bb53..66e4742628 100644
--- a/airflow/providers/mysql/CHANGELOG.rst
+++ b/airflow/providers/mysql/CHANGELOG.rst
@@ -27,6 +27,15 @@ used with MySQL server 5.6.4 through 5.7.
Changelog
---------
+4.0.0
+.....
+
+Breaking Changes
+~~~~~~~~~~~~~~~~
+
+You can no longer pass "local_infile" as extra in the connection. You should
pass it instead as
+hook's "local_infile" parameter when you create the MySqlHook (either directly
or via hook_params).
+
3.4.0
.....
diff --git a/airflow/providers/mysql/hooks/mysql.py
b/airflow/providers/mysql/hooks/mysql.py
index 843ac0b8aa..83a25f1aed 100644
--- a/airflow/providers/mysql/hooks/mysql.py
+++ b/airflow/providers/mysql/hooks/mysql.py
@@ -45,8 +45,12 @@ class MySqlHook(DbApiHook):
in extras.
extras example: ``{"iam":true, "aws_conn_id":"my_aws_conn"}``
+ You can also add "local_infile" parameter to determine whether
local_infile feature of MySQL client is
+ going to be enabled (it is disabled by default).
+
:param schema: The MySQL database schema to connect to.
:param connection: The :ref:`MySQL connection id <howto/connection:mysql>`
used for MySQL credentials.
+ :param local_infile: Boolean flag determining if local_infile should be
used
"""
conn_name_attr = "mysql_conn_id"
@@ -59,6 +63,7 @@ class MySqlHook(DbApiHook):
super().__init__(*args, **kwargs)
self.schema = kwargs.pop("schema", None)
self.connection = kwargs.pop("connection", None)
+ self.local_infile = kwargs.pop("local_infile", False)
def set_autocommit(self, conn: MySQLConnectionTypes, autocommit: bool) ->
None:
"""
@@ -118,7 +123,6 @@ class MySqlHook(DbApiHook):
conn_config["cursorclass"] = MySQLdb.cursors.DictCursor
elif (conn.extra_dejson["cursor"]).lower() == "ssdictcursor":
conn_config["cursorclass"] = MySQLdb.cursors.SSDictCursor
- local_infile = conn.extra_dejson.get("local_infile", False)
if conn.extra_dejson.get("ssl", False):
# SSL parameter for MySQL has to be a dictionary and in case
# of extra/dejson we can get string if extra is passed via
@@ -131,7 +135,7 @@ class MySqlHook(DbApiHook):
conn_config["ssl_mode"] = conn.extra_dejson["ssl_mode"]
if conn.extra_dejson.get("unix_socket"):
conn_config["unix_socket"] = conn.extra_dejson["unix_socket"]
- if local_infile:
+ if self.local_infile:
conn_config["local_infile"] = 1
return conn_config
@@ -144,7 +148,7 @@ class MySqlHook(DbApiHook):
"port": int(conn.port) if conn.port else 3306,
}
- if conn.extra_dejson.get("allow_local_infile", False):
+ if self.local_infile:
conn_config["allow_local_infile"] = True
# Ref:
https://dev.mysql.com/doc/connector-python/en/connector-python-connectargs.html
for key, value in conn.extra_dejson.items():
diff --git a/airflow/providers/mysql/provider.yaml
b/airflow/providers/mysql/provider.yaml
index 28d3fea687..d85abecf33 100644
--- a/airflow/providers/mysql/provider.yaml
+++ b/airflow/providers/mysql/provider.yaml
@@ -22,6 +22,7 @@ description: |
`MySQL <https://www.mysql.com/products/>`__
versions:
+ - 4.0.0
- 3.4.0
- 3.3.0
- 3.2.1
diff --git a/airflow/providers/mysql/transfers/vertica_to_mysql.py
b/airflow/providers/mysql/transfers/vertica_to_mysql.py
index a7df1f029d..5f2274c6b4 100644
--- a/airflow/providers/mysql/transfers/vertica_to_mysql.py
+++ b/airflow/providers/mysql/transfers/vertica_to_mysql.py
@@ -49,9 +49,8 @@ class VerticaToMySqlOperator(BaseOperator):
import, typically used to move data from staging to production
and issue cleanup commands. (templated)
:param bulk_load: flag to use bulk_load option. This loads MySQL directly
- from a tab-delimited text file using the LOAD DATA LOCAL INFILE
command.
- This option requires an extra connection parameter for the
- destination MySQL connection: {'local_infile': true}.
+ from a tab-delimited text file using the LOAD DATA LOCAL INFILE
command. The MySQL
+ server must support loading local files via this command (it is
disabled by default).
"""
template_fields: Sequence[str] = ("sql", "mysql_table",
"mysql_preoperator", "mysql_postoperator")
@@ -86,7 +85,7 @@ class VerticaToMySqlOperator(BaseOperator):
def execute(self, context: Context):
vertica = VerticaHook(vertica_conn_id=self.vertica_conn_id)
- mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
+ mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id,
local_infile=self.bulk_load)
if self.bulk_load:
self._bulk_load_transfer(mysql, vertica)
diff --git a/docs/apache-airflow-providers-mysql/connections/mysql.rst
b/docs/apache-airflow-providers-mysql/connections/mysql.rst
index 95d8e7aaba..e8b8091b83 100644
--- a/docs/apache-airflow-providers-mysql/connections/mysql.rst
+++ b/docs/apache-airflow-providers-mysql/connections/mysql.rst
@@ -46,9 +46,6 @@ Extra (optional)
* ``charset``: specify charset of the connection
* ``cursor``: one of ``sscursor``, ``dictcursor``, ``ssdictcursor`` .
Specifies cursor class to be
used
- * ``local_infile``: controls MySQL's LOCAL capability (permitting local
data loading by
- clients). See `MySQLdb docs
<https://mysqlclient.readthedocs.io/user_guide.html>`_
- for details.
* ``unix_socket``: UNIX socket used instead of the default socket.
* ``ssl``: Dictionary of SSL parameters that control connecting using
SSL. Those
parameters are server specific and should contain ``ca``, ``cert``,
``key``, ``capath``,
@@ -99,14 +96,7 @@ Extra (optional)
If encounter UnicodeDecodeError while working with MySQL connection,
check
the charset defined is matched to the database charset.
- For ``mysql-connector-python`` the following extras are supported:
+ For ``mysql-connector-python`` no extras are supported:
- * ``allow_local_infile``: Whether to enable ``LOAD DATA LOCAL INFILE``
capability.
-
- Example "extras" field:
-
- .. code-block:: json
-
- {
- "allow_local_infile": true
- }
+In both cases, when you want to use ``LOAD DATA LOCAL INFILE`` SQL commands of
MySQl, you need to create the
+Hook with "local_infile" parameter set to True.
diff --git a/tests/providers/apache/hive/transfers/test_hive_to_mysql.py
b/tests/providers/apache/hive/transfers/test_hive_to_mysql.py
index d8f39812b2..5fefc2594e 100644
--- a/tests/providers/apache/hive/transfers/test_hive_to_mysql.py
+++ b/tests/providers/apache/hive/transfers/test_hive_to_mysql.py
@@ -49,7 +49,9 @@ class TestHiveToMySqlTransfer(TestHiveEnvironment):
mock_hive_hook.assert_called_once_with(hiveserver2_conn_id=self.kwargs["hiveserver2_conn_id"])
mock_hive_hook.return_value.get_records.assert_called_once_with("sql",
parameters={})
-
mock_mysql_hook.assert_called_once_with(mysql_conn_id=self.kwargs["mysql_conn_id"])
+ mock_mysql_hook.assert_called_once_with(
+ mysql_conn_id=self.kwargs["mysql_conn_id"], local_infile=False
+ )
mock_mysql_hook.return_value.insert_rows.assert_called_once_with(
table=self.kwargs["mysql_table"],
rows=mock_hive_hook.return_value.get_records.return_value
)
@@ -84,6 +86,7 @@ class TestHiveToMySqlTransfer(TestHiveEnvironment):
HiveToMySqlOperator(**self.kwargs).execute(context=context)
+
mock_mysql_hook.assert_called_once_with(mysql_conn_id=self.kwargs["mysql_conn_id"],
local_infile=True)
mock_tmp_file_context.assert_called_once_with()
mock_hive_hook.return_value.to_csv.assert_called_once_with(
self.kwargs["sql"],
diff --git a/tests/providers/mysql/hooks/test_mysql.py
b/tests/providers/mysql/hooks/test_mysql.py
index eefc34c51b..1ce211e075 100644
--- a/tests/providers/mysql/hooks/test_mysql.py
+++ b/tests/providers/mysql/hooks/test_mysql.py
@@ -123,7 +123,7 @@ class TestMySqlHookConn:
@mock.patch("MySQLdb.connect")
def test_get_conn_local_infile(self, mock_connect):
- self.connection.extra = json.dumps({"local_infile": True})
+ self.db_hook.local_infile = True
self.db_hook.get_conn()
assert mock_connect.call_count == 1
args, kwargs = mock_connect.call_args
@@ -219,8 +219,8 @@ class TestMySqlHookConnMySqlConnectorPython:
@mock.patch("mysql.connector.connect")
def test_get_conn_allow_local_infile(self, mock_connect):
extra_dict = self.connection.extra_dejson
- extra_dict.update(allow_local_infile=True)
self.connection.extra = json.dumps(extra_dict)
+ self.db_hook.local_infile = True
self.db_hook.get_conn()
assert mock_connect.call_count == 1
args, kwargs = mock_connect.call_args
@@ -406,7 +406,7 @@ class TestMySql:
@mock.patch.dict(
"os.environ",
{
- "AIRFLOW_CONN_AIRFLOW_DB":
"mysql://root@mysql/airflow?charset=utf8mb4&local_infile=1",
+ "AIRFLOW_CONN_AIRFLOW_DB":
"mysql://root@mysql/airflow?charset=utf8mb4",
},
)
def test_mysql_hook_test_bulk_load(self, client):
@@ -419,7 +419,7 @@ class TestMySql:
f.write("\n".join(records).encode("utf8"))
f.flush()
- hook = MySqlHook("airflow_db")
+ hook = MySqlHook("airflow_db", local_infile=True)
with closing(hook.get_conn()) as conn:
with closing(conn.cursor()) as cursor:
cursor.execute(