This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 64a3787c24 Add various Vertica connection parameters (#32089)
64a3787c24 is described below
commit 64a3787c2414cc5f2e07ff10477c824efc2e9a5d
Author: darkag <[email protected]>
AuthorDate: Fri Jun 30 09:55:23 2023 +0200
Add various Vertica connection parameters (#32089)
---------
Co-authored-by: ivascot <[email protected]>
---
airflow/providers/vertica/hooks/vertica.py | 51 +++++++++++++
.../connections/vertica.rst | 83 ++++++++++++++++++++++
docs/apache-airflow-providers-vertica/index.rst | 7 ++
tests/providers/vertica/hooks/test_vertica.py | 76 ++++++++++++++++++++
4 files changed, 217 insertions(+)
diff --git a/airflow/providers/vertica/hooks/vertica.py
b/airflow/providers/vertica/hooks/vertica.py
index 92a74ea369..06b2e3cf17 100644
--- a/airflow/providers/vertica/hooks/vertica.py
+++ b/airflow/providers/vertica/hooks/vertica.py
@@ -46,5 +46,56 @@ class VerticaHook(DbApiHook):
else:
conn_config["port"] = int(conn.port)
+ bool_options = [
+ "connection_load_balance",
+ "binary_transfer",
+ "disable_copy_local",
+ "request_complex_types",
+ "use_prepared_statements",
+ ]
+ std_options = [
+ "session_label",
+ "backup_server_node",
+ "kerberos_host_name",
+ "kerberos_service_name",
+ "unicode_error",
+ "workload",
+ "ssl",
+ ]
+ conn_extra = conn.extra_dejson
+
+ for bo in bool_options:
+ if bo in conn_extra:
+ conn_config[bo] = str(conn_extra[bo]).lower() in ["true", "on"]
+
+ for so in std_options:
+ if so in conn_extra:
+ conn_config[so] = conn_extra[so]
+
+ if "connection_timeout" in conn_extra:
+ conn_config["connection_timeout"] =
float(conn_extra["connection_timeout"])
+
+ if "log_level" in conn_extra:
+ import logging
+
+ log_lvl = conn_extra["log_level"]
+ conn_config["log_path"] = None
+ if isinstance(log_lvl, str):
+ log_lvl = log_lvl.lower()
+ if log_lvl == "critical":
+ conn_config["log_level"] = logging.CRITICAL
+ elif log_lvl == "error":
+ conn_config["log_level"] = logging.ERROR
+ elif log_lvl == "warning":
+ conn_config["log_level"] = logging.WARNING
+ elif log_lvl == "info":
+ conn_config["log_level"] = logging.INFO
+ elif log_lvl == "debug":
+ conn_config["log_level"] = logging.DEBUG
+ elif log_lvl == "notset":
+ conn_config["log_level"] = logging.NOTSET
+ else:
+ conn_config["log_level"] = int(conn_extra["log_level"])
+
conn = connect(**conn_config)
return conn
diff --git a/docs/apache-airflow-providers-vertica/connections/vertica.rst
b/docs/apache-airflow-providers-vertica/connections/vertica.rst
new file mode 100644
index 0000000000..86f583a548
--- /dev/null
+++ b/docs/apache-airflow-providers-vertica/connections/vertica.rst
@@ -0,0 +1,83 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+
+.. _howto/connection:vertica:
+
+Vertica Connection
+==================
+The Vertica connection type provides connection to a Vertica database.
+
+Configuring the Connection
+--------------------------
+Host (required)
+ The host to connect to.
+
+Schema (optional)
+ Specify the schema name to be used in the database.
+
+Login (required)
+ Specify the user name to connect.
+
+Password (required)
+ Specify the password to connect.
+
+Extra (optional)
+ Specify the extra parameters (as json dictionary) that can be used in
Vertica
+ connection.
+
+ The following extras are supported:
+
+ * ``backup_server_node``: See `Connection Failover
<https://github.com/vertica/vertica-python#connection-failover>`_.
+ * ``binary_transfer``: See `Data Transfer Format
<https://github.com/vertica/vertica-python#data-transfer-format>`_.
+ * ``connection_load_balance``: See `Connection Load Balancing
<https://github.com/vertica/vertica-python#connection-load-balancing>`_.
+ * ``connection_timeout``: The number of seconds (can be a nonnegative
floating point number) the client
+ waits for a socket operation (Establishing a TCP connection or
read/write operation).
+ * ``disable_copy_local``: See `COPY FROM LOCAL
<https://github.com/vertica/vertica-python#method-2-copy-from-local-sql-with-cursorexecute>`_.
+ * ``kerberos_host_name``: See `Kerberos Authentication
<https://github.com/vertica/vertica-python#kerberos-authentication>`_.
+ * ``kerberos_service_name``: See `Kerberos Authentication
<https://github.com/vertica/vertica-python#kerberos-authentication>`_.
+ * ``log_level``: Enable vertica client logging. Traces will be visible
in tasks log. See `Logging
<https://github.com/vertica/vertica-python#logging>`_.
+ * ``request_complex_types:``: See `SQL Data conversion to Python objects
<https://github.com/vertica/vertica-python#sql-data-conversion-to-python-objects>`_.
+ * ``session_label``: Sets a label for the connection on the server.
+ * ``ssl``: Support only True or False. See `TLS/SSL
<https://github.com/vertica/vertica-python#tlsssl>`_.
+ * ``unicode_error``: See `UTF-8 encoding issues
<https://github.com/vertica/vertica-python#utf-8-encoding-issues>`_.
+ * ``use_prepared_statements``: See `Passing parameters to SQL queries
<https://github.com/vertica/vertica-python#passing-parameters-to-sql-queries>`_.
+ * ``workload``: Sets the workload name associated with this session.
+
+ See `vertica-python docs
<https://github.com/vertica/vertica-python#usage>`_ for details.
+
+
+ Example "extras" field:
+
+ .. code-block:: json
+
+ {
+ "connection_load_balance": true,
+ "log_level": "error",
+ "ssl": true
+ }
+
+ or
+
+ .. code-block:: json
+
+ {
+ "session_label": "airflow-session",
+ "connection_timeout": 30,
+ "backup_server_node": ["bck_server_1", "bck_server_2"]
+ }
diff --git a/docs/apache-airflow-providers-vertica/index.rst
b/docs/apache-airflow-providers-vertica/index.rst
index db09f1924c..ae7c2457b8 100644
--- a/docs/apache-airflow-providers-vertica/index.rst
+++ b/docs/apache-airflow-providers-vertica/index.rst
@@ -29,6 +29,13 @@
Changelog <changelog>
Security <security>
+.. toctree::
+ :hidden:
+ :maxdepth: 1
+ :caption: Guides
+
+ Connection types <connections/vertica>
+
.. toctree::
:hidden:
:maxdepth: 1
diff --git a/tests/providers/vertica/hooks/test_vertica.py
b/tests/providers/vertica/hooks/test_vertica.py
index e78c2a0c5c..146c3bcd11 100644
--- a/tests/providers/vertica/hooks/test_vertica.py
+++ b/tests/providers/vertica/hooks/test_vertica.py
@@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations
+import json
from unittest import mock
from unittest.mock import patch
@@ -47,6 +48,81 @@ class TestVerticaHookConn:
host="host", port=5433, database="vertica", user="login",
password="password"
)
+ @patch("airflow.providers.vertica.hooks.vertica.connect")
+ def test_get_conn_extra_parameters_no_cast(self, mock_connect):
+ """Test if parameters are correctly passed to connection"""
+ extra_dict = self.connection.extra_dejson
+ bool_options = [
+ "connection_load_balance",
+ "binary_transfer",
+ "disable_copy_local",
+ "use_prepared_statements",
+ ]
+ for bo in bool_options:
+ extra_dict.update({bo: True})
+ extra_dict.update({"request_complex_types": False})
+
+ std_options = [
+ "session_label",
+ "kerberos_host_name",
+ "kerberos_service_name",
+ "unicode_error",
+ "workload",
+ "ssl",
+ ]
+ for so in std_options:
+ extra_dict.update({so: so})
+ bck_server_node = ["1.2.3.4", "4.3.2.1"]
+ conn_timeout = 30
+ log_lvl = 40
+ extra_dict.update({"backup_server_node": bck_server_node})
+ extra_dict.update({"connection_timeout": conn_timeout})
+ extra_dict.update({"log_level": log_lvl})
+ self.connection.extra = json.dumps(extra_dict)
+ self.db_hook.get_conn()
+ assert mock_connect.call_count == 1
+ args, kwargs = mock_connect.call_args
+ assert args == ()
+ for bo in bool_options:
+ assert kwargs[bo] is True
+ assert kwargs["request_complex_types"] is False
+ for so in std_options:
+ assert kwargs[so] == so
+ assert bck_server_node[0] in kwargs["backup_server_node"]
+ assert bck_server_node[1] in kwargs["backup_server_node"]
+ assert kwargs["connection_timeout"] == conn_timeout
+ assert kwargs["log_level"] == log_lvl
+ assert kwargs["log_path"] is None
+
+ @patch("airflow.providers.vertica.hooks.vertica.connect")
+ def test_get_conn_extra_parameters_cast(self, mock_connect):
+ """Test if parameters that can be passed either as string or int/bool
+ like log_level are correctly converted when passed as string
+ (while test_get_conn_extra_parameters_no_cast tests them passed as
int/bool)"""
+ import logging
+
+ extra_dict = self.connection.extra_dejson
+ bool_options = [
+ "connection_load_balance",
+ "binary_transfer",
+ "disable_copy_local",
+ "use_prepared_statements",
+ ]
+ for bo in bool_options:
+ extra_dict.update({bo: "True"})
+ extra_dict.update({"request_complex_types": "False"})
+ extra_dict.update({"log_level": "Error"})
+ self.connection.extra = json.dumps(extra_dict)
+ self.db_hook.get_conn()
+ assert mock_connect.call_count == 1
+ args, kwargs = mock_connect.call_args
+ assert args == ()
+ for bo in bool_options:
+ assert kwargs[bo] is True
+ assert kwargs["request_complex_types"] is False
+ assert kwargs["log_level"] == logging.ERROR
+ assert kwargs["log_path"] is None
+
class TestVerticaHook:
def setup_method(self):