This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5c5a495c2c Add encryption_configuration parameter to BigQuery
operators (#40063)
5c5a495c2c is described below
commit 5c5a495c2cad9faa703eb8ccde47b368fb3eea9a
Author: VladaZakharova <[email protected]>
AuthorDate: Tue Jun 11 11:06:27 2024 +0200
Add encryption_configuration parameter to BigQuery operators (#40063)
---
.../providers/google/cloud/operators/bigquery.py | 86 ++++++++---
.../google/cloud/operators/test_bigquery.py | 164 +++++++++++++++++++++
2 files changed, 233 insertions(+), 17 deletions(-)
diff --git a/airflow/providers/google/cloud/operators/bigquery.py
b/airflow/providers/google/cloud/operators/bigquery.py
index 846f884e77..fbce7ca768 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -150,7 +150,12 @@ class _BigQueryOperatorsEncryptionConfigurationMixin:
# annotation of the `self`. Then you can inherit this class in the target
operator.
# e.g: BigQueryCheckOperator, BigQueryTableCheckOperator
def include_encryption_configuration( # type:ignore[misc]
- self: BigQueryCheckOperator | BigQueryTableCheckOperator,
+ self: BigQueryCheckOperator
+ | BigQueryTableCheckOperator
+ | BigQueryValueCheckOperator
+ | BigQueryColumnCheckOperator
+ | BigQueryGetDataOperator
+ | BigQueryIntervalCheckOperator,
configuration: dict,
config_key: str,
) -> None:
@@ -206,7 +211,7 @@ class BigQueryCheckOperator(
Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account.
(templated)
:param labels: a dictionary containing labels for the table, passed to
BigQuery.
- :param encryption_configuration: [Optional] Custom encryption
configuration (e.g., Cloud KMS keys).
+ :param encryption_configuration: (Optional) Custom encryption
configuration (e.g., Cloud KMS keys).
.. code-block:: python
@@ -327,7 +332,9 @@ class BigQueryCheckOperator(
self.log.info("Success.")
-class BigQueryValueCheckOperator(_BigQueryDbHookMixin, SQLValueCheckOperator):
+class BigQueryValueCheckOperator(
+ _BigQueryDbHookMixin, SQLValueCheckOperator,
_BigQueryOperatorsEncryptionConfigurationMixin
+):
"""Perform a simple value check using sql code.
.. seealso::
@@ -337,6 +344,13 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin,
SQLValueCheckOperator):
:param sql: SQL to execute.
:param use_legacy_sql: Whether to use legacy SQL (true)
or standard SQL (false).
+ :param encryption_configuration: (Optional) Custom encryption
configuration (e.g., Cloud KMS keys).
+
+ .. code-block:: python
+
+ encryption_configuration = {
+ "kmsKeyName":
"projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
+ }
:param gcp_conn_id: (Optional) The connection ID used to connect to Google
Cloud.
:param location: The geographic location of the job. See details at:
https://cloud.google.com/bigquery/docs/locations#specifying_your_location
@@ -371,6 +385,7 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin,
SQLValueCheckOperator):
sql: str,
pass_value: Any,
tolerance: Any = None,
+ encryption_configuration: dict | None = None,
gcp_conn_id: str = "google_cloud_default",
use_legacy_sql: bool = True,
location: str | None = None,
@@ -384,6 +399,7 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin,
SQLValueCheckOperator):
self.location = location
self.gcp_conn_id = gcp_conn_id
self.use_legacy_sql = use_legacy_sql
+ self.encryption_configuration = encryption_configuration
self.impersonation_chain = impersonation_chain
self.labels = labels
self.deferrable = deferrable
@@ -402,6 +418,8 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin,
SQLValueCheckOperator):
},
}
+ self.include_encryption_configuration(configuration, "query")
+
return hook.insert_job(
configuration=configuration,
project_id=hook.project_id,
@@ -461,7 +479,9 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin,
SQLValueCheckOperator):
)
-class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin,
SQLIntervalCheckOperator):
+class BigQueryIntervalCheckOperator(
+ _BigQueryDbHookMixin, SQLIntervalCheckOperator,
_BigQueryOperatorsEncryptionConfigurationMixin
+):
"""
Check that the values of metrics given as SQL expressions are within a
tolerance of the older ones.
@@ -482,6 +502,13 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin,
SQLIntervalCheckOperat
between the current day, and the prior days_back.
:param use_legacy_sql: Whether to use legacy SQL (true)
or standard SQL (false).
+ :param encryption_configuration: (Optional) Custom encryption
configuration (e.g., Cloud KMS keys).
+
+ .. code-block:: python
+
+ encryption_configuration = {
+ "kmsKeyName":
"projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
+ }
:param gcp_conn_id: (Optional) The connection ID used to connect to Google
Cloud.
:param location: The geographic location of the job. See details at:
https://cloud.google.com/bigquery/docs/locations#specifying_your_location
@@ -521,6 +548,7 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin,
SQLIntervalCheckOperat
gcp_conn_id: str = "google_cloud_default",
use_legacy_sql: bool = True,
location: str | None = None,
+ encryption_configuration: dict | None = None,
impersonation_chain: str | Sequence[str] | None = None,
labels: dict | None = None,
deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
@@ -539,6 +567,7 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin,
SQLIntervalCheckOperat
self.gcp_conn_id = gcp_conn_id
self.use_legacy_sql = use_legacy_sql
self.location = location
+ self.encryption_configuration = encryption_configuration
self.impersonation_chain = impersonation_chain
self.labels = labels
self.project_id = project_id
@@ -553,6 +582,7 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin,
SQLIntervalCheckOperat
) -> BigQueryJob:
"""Submit a new job and get the job id for polling the status using
Triggerer."""
configuration = {"query": {"query": sql, "useLegacySql":
self.use_legacy_sql}}
+ self.include_encryption_configuration(configuration, "query")
return hook.insert_job(
configuration=configuration,
project_id=self.project_id or hook.project_id,
@@ -609,7 +639,9 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin,
SQLIntervalCheckOperat
)
-class BigQueryColumnCheckOperator(_BigQueryDbHookMixin,
SQLColumnCheckOperator):
+class BigQueryColumnCheckOperator(
+ _BigQueryDbHookMixin, SQLColumnCheckOperator,
_BigQueryOperatorsEncryptionConfigurationMixin
+):
"""
Subclasses the SQLColumnCheckOperator in order to provide a job id for
OpenLineage to parse.
@@ -624,6 +656,13 @@ class BigQueryColumnCheckOperator(_BigQueryDbHookMixin,
SQLColumnCheckOperator):
:param partition_clause: a string SQL statement added to a WHERE clause
to partition data
:param gcp_conn_id: (Optional) The connection ID used to connect to Google
Cloud.
+ :param encryption_configuration: (Optional) Custom encryption
configuration (e.g., Cloud KMS keys).
+
+ .. code-block:: python
+
+ encryption_configuration = {
+ "kmsKeyName":
"projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
+ }
:param use_legacy_sql: Whether to use legacy SQL (true)
or standard SQL (false).
:param location: The geographic location of the job. See details at:
@@ -651,6 +690,7 @@ class BigQueryColumnCheckOperator(_BigQueryDbHookMixin,
SQLColumnCheckOperator):
partition_clause: str | None = None,
database: str | None = None,
accept_none: bool = True,
+ encryption_configuration: dict | None = None,
gcp_conn_id: str = "google_cloud_default",
use_legacy_sql: bool = True,
location: str | None = None,
@@ -672,6 +712,7 @@ class BigQueryColumnCheckOperator(_BigQueryDbHookMixin,
SQLColumnCheckOperator):
self.database = database
self.accept_none = accept_none
self.gcp_conn_id = gcp_conn_id
+ self.encryption_configuration = encryption_configuration
self.use_legacy_sql = use_legacy_sql
self.location = location
self.impersonation_chain = impersonation_chain
@@ -684,7 +725,7 @@ class BigQueryColumnCheckOperator(_BigQueryDbHookMixin,
SQLColumnCheckOperator):
) -> BigQueryJob:
"""Submit a new job and get the job id for polling the status using
Trigger."""
configuration = {"query": {"query": self.sql, "useLegacySql":
self.use_legacy_sql}}
-
+ self.include_encryption_configuration(configuration, "query")
return hook.insert_job(
configuration=configuration,
project_id=hook.project_id,
@@ -766,7 +807,7 @@ class BigQueryTableCheckOperator(
Service Account Token Creator IAM role to the directly preceding
identity, with first
account from the list granting this role to the originating account
(templated).
:param labels: a dictionary containing labels for the table, passed to
BigQuery
- :param encryption_configuration: [Optional] Custom encryption
configuration (e.g., Cloud KMS keys).
+ :param encryption_configuration: (Optional) Custom encryption
configuration (e.g., Cloud KMS keys).
.. code-block:: python
@@ -852,7 +893,7 @@ class BigQueryTableCheckOperator(
self.log.info("All tests have passed")
-class BigQueryGetDataOperator(GoogleCloudBaseOperator):
+class BigQueryGetDataOperator(GoogleCloudBaseOperator,
_BigQueryOperatorsEncryptionConfigurationMixin):
"""
Fetch data and return it, either from a BigQuery table, or results of a
query job.
@@ -921,6 +962,13 @@ class BigQueryGetDataOperator(GoogleCloudBaseOperator):
from the table. (templated)
:param selected_fields: List of fields to return (comma-separated). If
unspecified, all fields are returned.
+ :param encryption_configuration: (Optional) Custom encryption
configuration (e.g., Cloud KMS keys).
+
+ .. code-block:: python
+
+ encryption_configuration = {
+ "kmsKeyName":
"projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
+ }
:param gcp_conn_id: (Optional) The connection ID used to connect to Google
Cloud.
:param location: The location used for the operation.
:param impersonation_chain: Optional service account to impersonate using
short-term
@@ -965,6 +1013,7 @@ class BigQueryGetDataOperator(GoogleCloudBaseOperator):
selected_fields: str | None = None,
gcp_conn_id: str = "google_cloud_default",
location: str | None = None,
+ encryption_configuration: dict | None = None,
impersonation_chain: str | Sequence[str] | None = None,
deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
poll_interval: float = 4.0,
@@ -984,6 +1033,7 @@ class BigQueryGetDataOperator(GoogleCloudBaseOperator):
self.gcp_conn_id = gcp_conn_id
self.location = location
self.impersonation_chain = impersonation_chain
+ self.encryption_configuration = encryption_configuration
self.project_id = project_id
self.deferrable = deferrable
self.poll_interval = poll_interval
@@ -997,6 +1047,8 @@ class BigQueryGetDataOperator(GoogleCloudBaseOperator):
) -> BigQueryJob:
get_query = self.generate_query(hook=hook)
configuration = {"query": {"query": get_query, "useLegacySql":
self.use_legacy_sql}}
+ self.include_encryption_configuration(configuration, "query")
+
"""Submit a new job and get the job id for polling the status using
Triggerer."""
return hook.insert_job(
configuration=configuration,
@@ -1199,7 +1251,7 @@ class
BigQueryExecuteQueryOperator(GoogleCloudBaseOperator):
:param location: The geographic location of the job. Required except for
US and EU. See details at
https://cloud.google.com/bigquery/docs/locations#specifying_your_location
- :param encryption_configuration: [Optional] Custom encryption
configuration (e.g., Cloud KMS keys).
+ :param encryption_configuration: (Optional) Custom encryption
configuration (e.g., Cloud KMS keys).
.. code-block:: python
@@ -1393,9 +1445,9 @@ class
BigQueryCreateEmptyTableOperator(GoogleCloudBaseOperator):
.. seealso::
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#timePartitioning
- :param gcp_conn_id: [Optional] The connection ID used to connect to Google
Cloud and
+ :param gcp_conn_id: (Optional) The connection ID used to connect to Google
Cloud and
interact with the Bigquery service.
- :param google_cloud_storage_conn_id: [Optional] The connection ID used to
connect to Google Cloud.
+ :param google_cloud_storage_conn_id: (Optional) The connection ID used to
connect to Google Cloud.
and interact with the Google Cloud Storage service.
:param labels: a dictionary containing labels for the table, passed to
BigQuery
@@ -1433,13 +1485,13 @@ class
BigQueryCreateEmptyTableOperator(GoogleCloudBaseOperator):
google_cloud_storage_conn_id="airflow-conn-id",
)
- :param view: [Optional] A dictionary containing definition for the view.
+ :param view: (Optional) A dictionary containing definition for the view.
If set, it will create a view instead of a table:
.. seealso::
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ViewDefinition
- :param materialized_view: [Optional] The materialized view definition.
- :param encryption_configuration: [Optional] Custom encryption
configuration (e.g., Cloud KMS keys).
+ :param materialized_view: (Optional) The materialized view definition.
+ :param encryption_configuration: (Optional) Custom encryption
configuration (e.g., Cloud KMS keys).
.. code-block:: python
@@ -1447,7 +1499,7 @@ class
BigQueryCreateEmptyTableOperator(GoogleCloudBaseOperator):
"kmsKeyName":
"projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
}
:param location: The location used for the operation.
- :param cluster_fields: [Optional] The fields used for clustering.
+ :param cluster_fields: (Optional) The fields used for clustering.
BigQuery supports clustering for both partitioned and
non-partitioned tables.
@@ -1645,7 +1697,7 @@ class
BigQueryCreateExternalTableOperator(GoogleCloudBaseOperator):
:param autodetect: Try to detect schema and format options automatically.
The schema_fields and schema_object options will be honored when
specified explicitly.
https://cloud.google.com/bigquery/docs/schema-detect#schema_auto-detection_for_external_data_sources
- :param compression: [Optional] The compression type of the data source.
+ :param compression: (Optional) The compression type of the data source.
Possible values include GZIP and NONE.
The default value is NONE.
This setting is ignored for Google Cloud Bigtable,
@@ -1667,7 +1719,7 @@ class
BigQueryCreateExternalTableOperator(GoogleCloudBaseOperator):
and interact with the Google Cloud Storage service.
:param src_fmt_configs: configure optional fields specific to the source
format
:param labels: a dictionary containing labels for the table, passed to
BigQuery
- :param encryption_configuration: [Optional] Custom encryption
configuration (e.g., Cloud KMS keys).
+ :param encryption_configuration: (Optional) Custom encryption
configuration (e.g., Cloud KMS keys).
.. code-block:: python
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py
b/tests/providers/google/cloud/operators/test_bigquery.py
index 16c9cbdb82..3fa3446761 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -1062,6 +1062,49 @@ class TestBigQueryGetDataOperator:
operator.execute_complete(context=None, event={"status":
"success", "records": [20]})
mock_log_info.assert_called_with("Total extracted rows: %s", 1)
+ @pytest.mark.parametrize("as_dict", [True, False])
+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+ @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryJob")
+ def test_encryption_configuration(self, mock_job, mock_hook, as_dict):
+ encryption_configuration = {
+ "kmsKeyName":
"projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
+ }
+
+ mock_hook.return_value.insert_job.return_value = mock_job
+ mock_hook.return_value.project_id = TEST_GCP_PROJECT_ID
+
+ max_results = 1
+ selected_fields = "DATE"
+ operator = BigQueryGetDataOperator(
+ job_project_id=TEST_GCP_PROJECT_ID,
+ gcp_conn_id=GCP_CONN_ID,
+ task_id=TASK_ID,
+ job_id="",
+ max_results=max_results,
+ dataset_id=TEST_DATASET,
+ table_id=TEST_TABLE_ID,
+ selected_fields=selected_fields,
+ location=TEST_DATASET_LOCATION,
+ as_dict=as_dict,
+ encryption_configuration=encryption_configuration,
+ deferrable=True,
+ )
+ with pytest.raises(TaskDeferred):
+ operator.execute(MagicMock())
+ mock_hook.return_value.insert_job.assert_called_with(
+ configuration={
+ "query": {
+ "query": f"""select DATE from
`{TEST_GCP_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}` limit 1""",
+ "useLegacySql": True,
+ "destinationEncryptionConfiguration":
encryption_configuration,
+ }
+ },
+ project_id=TEST_GCP_PROJECT_ID,
+ location=TEST_DATASET_LOCATION,
+ job_id="",
+ nowait=True,
+ )
+
class TestBigQueryTableDeleteOperator:
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
@@ -2137,6 +2180,40 @@ class TestBigQueryIntervalCheckOperator:
nowait=True,
)
+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+ @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryJob")
+ def test_encryption_configuration_deferrable_mode(self, mock_job,
mock_hook):
+ encryption_configuration = {
+ "kmsKeyName":
"projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
+ }
+
+ mock_hook.return_value.insert_job.return_value = mock_job
+ mock_hook.return_value.project_id = TEST_GCP_PROJECT_ID
+
+ operator = BigQueryIntervalCheckOperator(
+ task_id="TASK_ID",
+ encryption_configuration=encryption_configuration,
+ location=TEST_DATASET_LOCATION,
+ metrics_thresholds={"COUNT(*)": 1.5},
+ table=TEST_TABLE_ID,
+ deferrable=True,
+ )
+ with pytest.raises(TaskDeferred):
+ operator.execute(MagicMock())
+ mock_hook.return_value.insert_job.assert_called_with(
+ configuration={
+ "query": {
+ "query": """SELECT COUNT(*) FROM test-table-id WHERE
ds='{{ macros.ds_add(ds, -7) }}'""",
+ "useLegacySql": True,
+ "destinationEncryptionConfiguration":
encryption_configuration,
+ }
+ },
+ project_id=TEST_GCP_PROJECT_ID,
+ location=TEST_DATASET_LOCATION,
+ job_id="",
+ nowait=True,
+ )
+
class TestBigQueryCheckOperator:
@pytest.mark.db_test
@@ -2425,6 +2502,46 @@ class TestBigQueryValueCheckOperator:
context=None, event={"status": "error", "message": "test
failure message"}
)
+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+ @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryJob")
+ def test_encryption_configuration_deferrable_mode(self, mock_job,
mock_hook):
+ encryption_configuration = {
+ "kmsKeyName":
"projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
+ }
+
+ mock_job.result.return_value.to_dataframe.return_value = pd.DataFrame(
+ {
+ "check_name": ["row_count_check"],
+ "check_result": [1],
+ }
+ )
+ mock_hook.return_value.insert_job.return_value = mock_job
+ mock_hook.return_value.project_id = TEST_GCP_PROJECT_ID
+
+ operator = BigQueryValueCheckOperator(
+ task_id="TASK_ID",
+ encryption_configuration=encryption_configuration,
+ location=TEST_DATASET_LOCATION,
+ pass_value=2,
+ sql=f"SELECT COUNT(*) FROM {TEST_DATASET}.{TEST_TABLE_ID}",
+ deferrable=True,
+ )
+ with pytest.raises(TaskDeferred):
+ operator.execute(MagicMock())
+ mock_hook.return_value.insert_job.assert_called_with(
+ configuration={
+ "query": {
+ "query": f"""SELECT COUNT(*) FROM
{TEST_DATASET}.{TEST_TABLE_ID}""",
+ "useLegacySql": True,
+ "destinationEncryptionConfiguration":
encryption_configuration,
+ }
+ },
+ project_id=TEST_GCP_PROJECT_ID,
+ location=TEST_DATASET_LOCATION,
+ job_id="",
+ nowait=True,
+ )
+
@pytest.mark.db_test
class TestBigQueryColumnCheckOperator:
@@ -2495,6 +2612,53 @@ class TestBigQueryColumnCheckOperator:
with pytest.raises(AirflowException):
ti.task.execute(MagicMock())
+ @pytest.mark.parametrize(
+ "check_type, check_value, check_result",
+ [
+ ("equal_to", 0, 0),
+ ("greater_than", 0, 1),
+ ("less_than", 0, -1),
+ ],
+ )
+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+ @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryJob")
+ def test_encryption_configuration(self, mock_job, mock_hook, check_type,
check_value, check_result):
+ encryption_configuration = {
+ "kmsKeyName":
"projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
+ }
+
+ mock_job.result.return_value.to_dataframe.return_value = pd.DataFrame(
+ {"col_name": ["col1"], "check_type": ["min"], "check_result":
[check_result]}
+ )
+ mock_hook.return_value.insert_job.return_value = mock_job
+ mock_hook.return_value.project_id = TEST_GCP_PROJECT_ID
+
+ operator = BigQueryColumnCheckOperator(
+ task_id="TASK_ID",
+ encryption_configuration=encryption_configuration,
+ table=f"{TEST_DATASET}.{TEST_TABLE_ID}",
+ column_mapping={"col1": {"min": {check_type: check_value}}},
+ location=TEST_DATASET_LOCATION,
+ )
+
+ operator.execute(MagicMock())
+ mock_hook.return_value.insert_job.assert_called_with(
+ configuration={
+ "query": {
+ "query": f"""SELECT col_name, check_type, check_result
FROM (
+ SELECT 'col1' AS col_name, 'min' AS check_type, col1_min AS
check_result
+ FROM (SELECT MIN(col1) AS col1_min FROM {TEST_DATASET}.{TEST_TABLE_ID}
) AS sq
+ ) AS check_columns""",
+ "useLegacySql": True,
+ "destinationEncryptionConfiguration":
encryption_configuration,
+ }
+ },
+ project_id=TEST_GCP_PROJECT_ID,
+ location=TEST_DATASET_LOCATION,
+ job_id="",
+ nowait=False,
+ )
+
class TestBigQueryTableCheckOperator:
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")