This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1a77bc6 :bug: (BigQueryHook) fix compatibility with sqlalchemy engine
(#19508)
1a77bc6 is described below
commit 1a77bc6481580ab6817267b6e075634caaa025be
Author: 張泰瑋(Chang Tai Wei) <[email protected]>
AuthorDate: Mon Feb 7 04:56:52 2022 +0800
:bug: (BigQueryHook) fix compatibility with sqlalchemy engine (#19508)
---
airflow/providers/google/cloud/hooks/bigquery.py | 37 ++++++++++++++++++++++
setup.py | 1 +
.../providers/google/cloud/hooks/test_bigquery.py | 13 ++++++++
3 files changed, 51 insertions(+)
diff --git a/airflow/providers/google/cloud/hooks/bigquery.py
b/airflow/providers/google/cloud/hooks/bigquery.py
index c2a22d8..feb90f4 100644
--- a/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/airflow/providers/google/cloud/hooks/bigquery.py
@@ -51,6 +51,7 @@ from pandas_gbq.gbq import (
_check_google_client_version as gbq_check_google_client_version,
_test_google_api_imports as gbq_test_google_api_imports,
)
+from sqlalchemy import create_engine
from airflow.exceptions import AirflowException
from airflow.hooks.dbapi import DbApiHook
@@ -114,6 +115,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
self.running_job_id = None # type: Optional[str]
self.api_resource_configs = api_resource_configs if
api_resource_configs else {} # type Dict
self.labels = labels
+ self.credentials_path = "bigquery_hook_credentials.json"
def get_conn(self) -> "BigQueryConnection":
"""Returns a BigQuery PEP 249 connection object."""
@@ -150,6 +152,41 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
credentials=self._get_credentials(),
)
+ def get_uri(self) -> str:
+ """Override DbApiHook get_uri method for get_sqlalchemy_engine()"""
+ return f"bigquery://{self.project_id}"
+
+ def get_sqlalchemy_engine(self, engine_kwargs=None):
+ """
+ Get an sqlalchemy_engine object.
+
+ :param engine_kwargs: Kwargs used in :func:`~sqlalchemy.create_engine`.
+ :return: the created engine.
+ """
+ connection = self.get_connection(self.gcp_conn_id)
+ if
connection.extra_dejson.get("extra__google_cloud_platform__key_path"):
+ credentials_path =
connection.extra_dejson['extra__google_cloud_platform__key_path']
+ return create_engine(self.get_uri(),
credentials_path=credentials_path, **engine_kwargs)
+ elif
connection.extra_dejson.get("extra__google_cloud_platform__keyfile_dict"):
+ credential_file_content = json.loads(
+
connection.extra_dejson["extra__google_cloud_platform__keyfile_dict"]
+ )
+ return create_engine(self.get_uri(),
credentials_info=credential_file_content, **engine_kwargs)
+ try:
+ # 1. If the environment variable GOOGLE_APPLICATION_CREDENTIALS is
set
+ # ADC uses the service account key or configuration file that the
variable points to.
+ # 2. If the environment variable GOOGLE_APPLICATION_CREDENTIALS
isn't set
+ # ADC uses the service account that is attached to the resource
that is running your code.
+ return create_engine(self.get_uri(), **engine_kwargs)
+ except Exception as e:
+ self.log.error(e)
+ raise AirflowException(
+ "For now, we only support instantiating SQLAlchemy engine by"
+ " using ADC"
+ ", extra__google_cloud_platform__key_path"
+ "and extra__google_cloud_platform__keyfile_dict"
+ )
+
@staticmethod
def _resolve_table_reference(
table_resource: Dict[str, Any],
diff --git a/setup.py b/setup.py
index b55ca64..85c9fd7 100644
--- a/setup.py
+++ b/setup.py
@@ -343,6 +343,7 @@ google = [
# _check_google_client_version
(airflow/providers/google/cloud/hooks/bigquery.py:49)
'pandas-gbq<0.15.0',
pandas_requirement,
+ 'sqlalchemy-bigquery>=1.2.1',
]
grpc = [
'google-auth>=1.0.0, <3.0.0',
diff --git a/tests/providers/google/cloud/hooks/test_bigquery.py
b/tests/providers/google/cloud/hooks/test_bigquery.py
index e0c2341..48f15fd 100644
--- a/tests/providers/google/cloud/hooks/test_bigquery.py
+++ b/tests/providers/google/cloud/hooks/test_bigquery.py
@@ -930,6 +930,19 @@ class TestBigQueryHookMethods(_BigQueryBaseTestClass):
)
mock_query_job.from_api_repr.return_value.result.assert_called_once_with()
+ def test_dbapi_get_uri(self):
+ assert self.hook.get_uri().startswith('bigquery://')
+
+ def test_dbapi_get_sqlalchemy_engine(self):
+ with pytest.raises(
+ AirflowException,
+ match="For now, we only support instantiating SQLAlchemy engine by"
+ " using ADC"
+ ", extra__google_cloud_platform__key_path"
+ "and extra__google_cloud_platform__keyfile_dict",
+ ):
+ self.hook.get_sqlalchemy_engine()
+
class TestBigQueryTableSplitter(unittest.TestCase):
def test_internal_need_default_project(self):