This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1a77bc6  :bug: (BigQueryHook) fix compatibility with sqlalchemy engine 
(#19508)
1a77bc6 is described below

commit 1a77bc6481580ab6817267b6e075634caaa025be
Author: 張泰瑋(Chang Tai Wei) <[email protected]>
AuthorDate: Mon Feb 7 04:56:52 2022 +0800

    :bug: (BigQueryHook) fix compatibility with sqlalchemy engine (#19508)
---
 airflow/providers/google/cloud/hooks/bigquery.py   | 37 ++++++++++++++++++++++
 setup.py                                           |  1 +
 .../providers/google/cloud/hooks/test_bigquery.py  | 13 ++++++++
 3 files changed, 51 insertions(+)

diff --git a/airflow/providers/google/cloud/hooks/bigquery.py 
b/airflow/providers/google/cloud/hooks/bigquery.py
index c2a22d8..feb90f4 100644
--- a/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/airflow/providers/google/cloud/hooks/bigquery.py
@@ -51,6 +51,7 @@ from pandas_gbq.gbq import (
     _check_google_client_version as gbq_check_google_client_version,
     _test_google_api_imports as gbq_test_google_api_imports,
 )
+from sqlalchemy import create_engine
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.dbapi import DbApiHook
@@ -114,6 +115,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
         self.running_job_id = None  # type: Optional[str]
         self.api_resource_configs = api_resource_configs if 
api_resource_configs else {}  # type Dict
         self.labels = labels
+        self.credentials_path = "bigquery_hook_credentials.json"
 
     def get_conn(self) -> "BigQueryConnection":
         """Returns a BigQuery PEP 249 connection object."""
@@ -150,6 +152,41 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
             credentials=self._get_credentials(),
         )
 
+    def get_uri(self) -> str:
+        """Override DbApiHook get_uri method for get_sqlalchemy_engine()"""
+        return f"bigquery://{self.project_id}"
+
+    def get_sqlalchemy_engine(self, engine_kwargs=None):
+        """
+        Get an sqlalchemy_engine object.
+
+        :param engine_kwargs: Kwargs used in :func:`~sqlalchemy.create_engine`.
+        :return: the created engine.
+        """
+        connection = self.get_connection(self.gcp_conn_id)
+        if 
connection.extra_dejson.get("extra__google_cloud_platform__key_path"):
+            credentials_path = 
connection.extra_dejson['extra__google_cloud_platform__key_path']
+            return create_engine(self.get_uri(), 
credentials_path=credentials_path, **engine_kwargs)
+        elif 
connection.extra_dejson.get("extra__google_cloud_platform__keyfile_dict"):
+            credential_file_content = json.loads(
+                
connection.extra_dejson["extra__google_cloud_platform__keyfile_dict"]
+            )
+            return create_engine(self.get_uri(), 
credentials_info=credential_file_content, **engine_kwargs)
+        try:
+            # 1. If the environment variable GOOGLE_APPLICATION_CREDENTIALS is 
set
+            # ADC uses the service account key or configuration file that the 
variable points to.
+            # 2. If the environment variable GOOGLE_APPLICATION_CREDENTIALS 
isn't set
+            # ADC uses the service account that is attached to the resource 
that is running your code.
+            return create_engine(self.get_uri(), **engine_kwargs)
+        except Exception as e:
+            self.log.error(e)
+            raise AirflowException(
+                "For now, we only support instantiating SQLAlchemy engine by"
+                " using ADC"
+                ", extra__google_cloud_platform__key_path"
+                "and extra__google_cloud_platform__keyfile_dict"
+            )
+
     @staticmethod
     def _resolve_table_reference(
         table_resource: Dict[str, Any],
diff --git a/setup.py b/setup.py
index b55ca64..85c9fd7 100644
--- a/setup.py
+++ b/setup.py
@@ -343,6 +343,7 @@ google = [
     # _check_google_client_version 
(airflow/providers/google/cloud/hooks/bigquery.py:49)
     'pandas-gbq<0.15.0',
     pandas_requirement,
+    'sqlalchemy-bigquery>=1.2.1',
 ]
 grpc = [
     'google-auth>=1.0.0, <3.0.0',
diff --git a/tests/providers/google/cloud/hooks/test_bigquery.py 
b/tests/providers/google/cloud/hooks/test_bigquery.py
index e0c2341..48f15fd 100644
--- a/tests/providers/google/cloud/hooks/test_bigquery.py
+++ b/tests/providers/google/cloud/hooks/test_bigquery.py
@@ -930,6 +930,19 @@ class TestBigQueryHookMethods(_BigQueryBaseTestClass):
         )
         
mock_query_job.from_api_repr.return_value.result.assert_called_once_with()
 
+    def test_dbapi_get_uri(self):
+        assert self.hook.get_uri().startswith('bigquery://')
+
+    def test_dbapi_get_sqlalchemy_engine(self):
+        with pytest.raises(
+            AirflowException,
+            match="For now, we only support instantiating SQLAlchemy engine by"
+            " using ADC"
+            ", extra__google_cloud_platform__key_path"
+            "and extra__google_cloud_platform__keyfile_dict",
+        ):
+            self.hook.get_sqlalchemy_engine()
+
 
 class TestBigQueryTableSplitter(unittest.TestCase):
     def test_internal_need_default_project(self):

Reply via email to