This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 42c3eaf23a Fix BigQuery connection and add docs (#38430)
42c3eaf23a is described below
commit 42c3eaf23a1cb76ae9f07b027b09948aabfcbf02
Author: Shahar Epstein <[email protected]>
AuthorDate: Tue Apr 9 14:59:58 2024 +0300
Fix BigQuery connection and add docs (#38430)
Co-authored-by: Andrey Anshin <[email protected]>
---
airflow/providers/google/cloud/hooks/bigquery.py | 76 +++++++++++++++++-----
.../connections/bigquery.rst | 62 ++++++++++++++++++
.../connections/gcp.rst | 2 +
.../providers/google/cloud/hooks/test_bigquery.py | 5 ++
4 files changed, 128 insertions(+), 17 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/bigquery.py
b/airflow/providers/google/cloud/hooks/bigquery.py
index ed1e284f97..a39025931d 100644
--- a/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/airflow/providers/google/cloud/hooks/bigquery.py
@@ -28,6 +28,7 @@ import time
import uuid
from copy import deepcopy
from datetime import datetime, timedelta
+from functools import cached_property
from typing import TYPE_CHECKING, Any, Iterable, Mapping, NoReturn, Sequence,
Union, cast
from aiohttp import ClientSession as ClientSession
@@ -103,14 +104,49 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
conn_type = "gcpbigquery"
hook_name = "Google Bigquery"
+ @classmethod
+ def get_connection_form_widgets(cls) -> dict[str, Any]:
+ """Return connection widgets to add to connection form."""
+ from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
+ from flask_babel import lazy_gettext
+ from wtforms import validators
+ from wtforms.fields.simple import BooleanField, StringField
+
+ from airflow.www.validators import ValidJson
+
+ connection_form_widgets = super().get_connection_form_widgets()
+ connection_form_widgets["use_legacy_sql"] =
BooleanField(lazy_gettext("Use Legacy SQL"), default=True)
+ connection_form_widgets["location"] = StringField(
+ lazy_gettext("Location"), widget=BS3TextFieldWidget()
+ )
+ connection_form_widgets["priority"] = StringField(
+ lazy_gettext("Priority"),
+ default="INTERACTIVE",
+ widget=BS3TextFieldWidget(),
+ validators=[validators.AnyOf(["INTERACTIVE", "BATCH"])],
+ )
+ connection_form_widgets["api_resource_configs"] = StringField(
+ lazy_gettext("API Resource Configs"), widget=BS3TextFieldWidget(),
validators=[ValidJson()]
+ )
+ connection_form_widgets["labels"] = StringField(
+ lazy_gettext("Labels"), widget=BS3TextFieldWidget(),
validators=[ValidJson()]
+ )
+ connection_form_widgets["labels"] = StringField(
+ lazy_gettext("Labels"), widget=BS3TextFieldWidget(),
validators=[ValidJson()]
+ )
+ return connection_form_widgets
+
+ @classmethod
+ def get_ui_field_behaviour(cls) -> dict[str, Any]:
+ """Return custom field behaviour."""
+ return super().get_ui_field_behaviour()
+
def __init__(
self,
- gcp_conn_id: str = GoogleBaseHook.default_conn_name,
use_legacy_sql: bool = True,
location: str | None = None,
priority: str = "INTERACTIVE",
api_resource_configs: dict | None = None,
- impersonation_chain: str | Sequence[str] | None = None,
impersonation_scopes: str | Sequence[str] | None = None,
labels: dict | None = None,
**kwargs,
@@ -120,18 +156,25 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
"The `delegate_to` parameter has been deprecated before and
finally removed in this version"
" of Google Provider. You MUST convert it to
`impersonate_chain`"
)
- super().__init__(
- gcp_conn_id=gcp_conn_id,
- impersonation_chain=impersonation_chain,
- )
- self.use_legacy_sql = use_legacy_sql
- self.location = location
- self.priority = priority
+ super().__init__(**kwargs)
+ self.use_legacy_sql: bool = self._get_field("use_legacy_sql",
use_legacy_sql)
+ self.location: str | None = self._get_field("location", location)
+ self.priority: str = self._get_field("priority", priority)
self.running_job_id: str | None = None
- self.api_resource_configs: dict = api_resource_configs or {}
- self.labels = labels
- self.credentials_path = "bigquery_hook_credentials.json"
- self.impersonation_scopes = impersonation_scopes
+ self.api_resource_configs: dict =
self._get_field("api_resource_configs", api_resource_configs or {})
+ self.labels = self._get_field("labels", labels or {})
+ self.impersonation_scopes: str | Sequence[str] | None =
impersonation_scopes
+
+ @cached_property
+ @deprecated(
+ reason=(
+ "`BigQueryHook.credentials_path` property is deprecated and will
be removed in the future. "
+ "This property used for obtaining credentials path but no longer
in actual use. "
+ ),
+ category=AirflowProviderDeprecationWarning,
+ )
+ def credentials_path(self) -> str:
+ return "bigquery_hook_credentials.json"
def get_conn(self) -> BigQueryConnection:
"""Get a BigQuery PEP 249 connection object."""
@@ -172,18 +215,17 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
"""Override from ``DbApiHook`` for ``get_sqlalchemy_engine()``."""
return f"bigquery://{self.project_id}"
- def get_sqlalchemy_engine(self, engine_kwargs=None):
+ def get_sqlalchemy_engine(self, engine_kwargs: dict | None = None):
"""Create an SQLAlchemy engine object.
:param engine_kwargs: Kwargs used in :func:`~sqlalchemy.create_engine`.
"""
if engine_kwargs is None:
engine_kwargs = {}
- extras = self.get_connection(self.gcp_conn_id).extra_dejson
- credentials_path = get_field(extras, "key_path")
+ credentials_path = get_field(self.extras, "key_path")
if credentials_path:
return create_engine(self.get_uri(),
credentials_path=credentials_path, **engine_kwargs)
- keyfile_dict = get_field(extras, "keyfile_dict")
+ keyfile_dict = get_field(self.extras, "keyfile_dict")
if keyfile_dict:
keyfile_content = keyfile_dict if isinstance(keyfile_dict, dict)
else json.loads(keyfile_dict)
return create_engine(self.get_uri(),
credentials_info=keyfile_content, **engine_kwargs)
diff --git a/docs/apache-airflow-providers-google/connections/bigquery.rst
b/docs/apache-airflow-providers-google/connections/bigquery.rst
new file mode 100644
index 0000000000..27a66582f0
--- /dev/null
+++ b/docs/apache-airflow-providers-google/connections/bigquery.rst
@@ -0,0 +1,62 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+
+.. _howto/connection:gcpbigquery:
+
+Google Cloud BigQuery Connection
+================================
+
+The Google Cloud BigQuery connection type enables integration with the Google
Cloud BigQuery.
+As it is built on the top of Google Cloud Connection (i.e., BigQuery hook
inherits from
+GCP base hook), the basic authentication methods and parameters are exactly
the same as the Google Cloud Connection.
+Extra parameters that are specific to BigQuery will be covered in this
document.
+
+
+Configuring the Connection
+--------------------------
+.. note::
+ Please refer to :ref:`Google Cloud Connection
docs<howto/connection:gcp:configuring_the_connection>`
+ for information regarding the basic authentication parameters.
+
+Impersonation Scopes
+
+
+Use Legacy SQL
+ Whether or not the connection should utilize legacy SQL.
+
+Location
+ One of `BigQuery locations
<https://cloud.google.com/bigquery/docs/locations>`_ where the dataset resides.
+ If None, it utilizes the default location configured in the BigQuery
service.
+
+Priority
+ Should be either "INTERACTIVE" or "BATCH",
+ see `running queries docs
<https://cloud.google.com/bigquery/docs/running-queries>`_.
+ Interactive query jobs, which are jobs that BigQuery runs on demand.
+ Batch query jobs, which are jobs that BigQuery waits to run until idle
compute resources are available.
+
+API Resource Configs
+ A dictionary containing parameters for configuring the Google BigQuery
Jobs API.
+ These configurations are applied according to the specifications outlined
in the
+ `BigQuery Jobs API documentation
<https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs>`_.
+ For example, you can specify configurations such as {'query':
{'useQueryCache': False}}.
+ This parameter is useful when you need to provide additional parameters
that are not directly supported by the
+ BigQueryHook.
+
+Labels
+ A dictionary of labels to be applied on the BigQuery job.
diff --git a/docs/apache-airflow-providers-google/connections/gcp.rst
b/docs/apache-airflow-providers-google/connections/gcp.rst
index 9ebe21efe3..5418531957 100644
--- a/docs/apache-airflow-providers-google/connections/gcp.rst
+++ b/docs/apache-airflow-providers-google/connections/gcp.rst
@@ -82,6 +82,8 @@ For example:
export AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT='google-cloud-platform://'
+.. _howto/connection:gcp:configuring_the_connection:
+
Configuring the Connection
--------------------------
diff --git a/tests/providers/google/cloud/hooks/test_bigquery.py
b/tests/providers/google/cloud/hooks/test_bigquery.py
index bfb66f5e1f..b02222b350 100644
--- a/tests/providers/google/cloud/hooks/test_bigquery.py
+++ b/tests/providers/google/cloud/hooks/test_bigquery.py
@@ -77,6 +77,11 @@ def test_delegate_to_runtime_error():
@pytest.mark.db_test
class TestBigQueryHookMethods(_BigQueryBaseTestClass):
+ def test_credentials_path_derprecation(self):
+ with pytest.warns(AirflowProviderDeprecationWarning):
+ credentials_path = self.hook.credentials_path
+ assert credentials_path == "bigquery_hook_credentials.json"
+
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryConnection")
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook._authorize")
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.build")