This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a5adb87ab4 Add a new param for BigQuery operators to support 
additional actions when resource exists (#29394)
a5adb87ab4 is described below

commit a5adb87ab4ee537eb37ef31aba755b40f6f29a1e
Author: Hussein Awala <[email protected]>
AuthorDate: Sun Feb 26 20:09:08 2023 +0100

    Add a new param for BigQuery operators to support additional actions when 
resource exists (#29394)
    
    * Add a new param to support additional actions when resource exists and 
depracte old one
    ---------
    
    Co-authored-by: eladkal <[email protected]>
---
 .../providers/google/cloud/operators/bigquery.py   | 101 ++++++++++++++-------
 .../google/cloud/operators/test_bigquery.py        |  71 ++++++++++++++-
 2 files changed, 140 insertions(+), 32 deletions(-)

diff --git a/airflow/providers/google/cloud/operators/bigquery.py 
b/airflow/providers/google/cloud/operators/bigquery.py
index 23144c1c11..75b8c02a0d 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -28,7 +28,7 @@ from google.api_core.exceptions import Conflict
 from google.api_core.retry import Retry
 from google.cloud.bigquery import DEFAULT_RETRY, CopyJob, ExtractJob, LoadJob, 
QueryJob
 
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, AirflowSkipException
 from airflow.models import BaseOperator, BaseOperatorLink
 from airflow.models.xcom import XCom
 from airflow.providers.common.sql.operators.sql import (
@@ -68,6 +68,15 @@ class BigQueryUIColors(enum.Enum):
     DATASET = "#5F86FF"
 
 
+class IfExistAction(enum.Enum):
+    """Action to take if the resource exist"""
+
+    IGNORE = "ignore"
+    LOG = "log"
+    FAIL = "fail"
+    SKIP = "skip"
+
+
 class BigQueryConsoleLink(BaseOperatorLink):
     """Helper class for constructing BigQuery link."""
 
@@ -248,7 +257,9 @@ class BigQueryCheckOperator(_BigQueryDbHookMixin, 
SQLCheckOperator):
         if not records:
             raise AirflowException("The query returned empty results")
         elif not all(bool(r) for r in records):
-            self._raise_exception(f"Test 
failed.\nQuery:\n{self.sql}\nResults:\n{records!s}")
+            self._raise_exception(  # type: ignore[attr-defined]
+                f"Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}"
+            )
         self.log.info("Record: %s", event["records"])
         self.log.info("Success.")
 
@@ -773,9 +784,6 @@ class BigQueryGetDataOperator(GoogleCloudBaseOperator):
     :param selected_fields: List of fields to return (comma-separated). If
         unspecified, all fields are returned.
     :param gcp_conn_id: (Optional) The connection ID used to connect to Google 
Cloud.
-    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
-        if any. For this to work, the service account making the request must 
have
-        domain-wide delegation enabled. Deprecated.
     :param location: The location used for the operation.
     :param impersonation_chain: Optional service account to impersonate using 
short-term
         credentials, or chained list of accounts required to get the 
access_token
@@ -786,6 +794,9 @@ class BigQueryGetDataOperator(GoogleCloudBaseOperator):
         Service Account Token Creator IAM role to the directly preceding 
identity, with first
         account from the list granting this role to the originating account 
(templated).
     :param deferrable: Run operator in the deferrable mode
+    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
+        if any. For this to work, the service account making the request must 
have
+        domain-wide delegation enabled. Deprecated.
     """
 
     template_fields: Sequence[str] = (
@@ -807,10 +818,10 @@ class BigQueryGetDataOperator(GoogleCloudBaseOperator):
         max_results: int = 100,
         selected_fields: str | None = None,
         gcp_conn_id: str = "google_cloud_default",
-        delegate_to: str | None = None,
         location: str | None = None,
         impersonation_chain: str | Sequence[str] | None = None,
         deferrable: bool = False,
+        delegate_to: str | None = None,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
@@ -1253,7 +1264,10 @@ class 
BigQueryCreateEmptyTableOperator(GoogleCloudBaseOperator):
         If set as a sequence, the identities from the list must grant
         Service Account Token Creator IAM role to the directly preceding 
identity, with first
         account from the list granting this role to the originating account 
(templated).
-    :param exists_ok: If ``True``, ignore "already exists" errors when 
creating the table.
+    :param if_exists: What should Airflow do if the table exists. If set to 
`log`, the TI will be passed to
+        success and an error message will be logged. Set to `ignore` to ignore 
the error, set to `fail` to
+        fail the TI, and set to `skip` to skip it.
+    :param exists_ok: Deprecated - use `if_exists="ignore"` instead.
     """
 
     template_fields: Sequence[str] = (
@@ -1282,9 +1296,7 @@ class 
BigQueryCreateEmptyTableOperator(GoogleCloudBaseOperator):
         gcs_schema_object: str | None = None,
         time_partitioning: dict | None = None,
         gcp_conn_id: str = "google_cloud_default",
-        bigquery_conn_id: str | None = None,
         google_cloud_storage_conn_id: str = "google_cloud_default",
-        delegate_to: str | None = None,
         labels: dict | None = None,
         view: dict | None = None,
         materialized_view: dict | None = None,
@@ -1292,7 +1304,10 @@ class 
BigQueryCreateEmptyTableOperator(GoogleCloudBaseOperator):
         location: str | None = None,
         cluster_fields: list[str] | None = None,
         impersonation_chain: str | Sequence[str] | None = None,
-        exists_ok: bool = False,
+        if_exists: str = "log",
+        delegate_to: str | None = None,
+        bigquery_conn_id: str | None = None,
+        exists_ok: bool | None = None,
         **kwargs,
     ) -> None:
         if bigquery_conn_id:
@@ -1326,7 +1341,11 @@ class 
BigQueryCreateEmptyTableOperator(GoogleCloudBaseOperator):
         self.cluster_fields = cluster_fields
         self.table_resource = table_resource
         self.impersonation_chain = impersonation_chain
-        self.exists_ok = exists_ok
+        if exists_ok is not None:
+            warnings.warn("`exists_ok` parameter is deprecated, please use 
`if_exists`", DeprecationWarning)
+            self.if_exists = IfExistAction.IGNORE if exists_ok else 
IfExistAction.LOG
+        else:
+            self.if_exists = IfExistAction(if_exists)
 
     def execute(self, context: Context) -> None:
         bq_hook = BigQueryHook(
@@ -1362,7 +1381,7 @@ class 
BigQueryCreateEmptyTableOperator(GoogleCloudBaseOperator):
                 materialized_view=self.materialized_view,
                 encryption_configuration=self.encryption_configuration,
                 table_resource=self.table_resource,
-                exists_ok=self.exists_ok,
+                exists_ok=self.if_exists == IfExistAction.IGNORE,
             )
             BigQueryTableLink.persist(
                 context=context,
@@ -1375,7 +1394,13 @@ class 
BigQueryCreateEmptyTableOperator(GoogleCloudBaseOperator):
                 "Table %s.%s.%s created successfully", table.project, 
table.dataset_id, table.table_id
             )
         except Conflict:
-            self.log.info("Table %s.%s already exists.", self.dataset_id, 
self.table_id)
+            error_msg = f"Table {self.dataset_id}.{self.table_id} already 
exists."
+            if self.if_exists == IfExistAction.LOG:
+                self.log.info(error_msg)
+            elif self.if_exists == IfExistAction.FAIL:
+                raise AirflowException(error_msg)
+            else:
+                raise AirflowSkipException(error_msg)
 
 
 class BigQueryCreateExternalTableOperator(GoogleCloudBaseOperator):
@@ -1490,14 +1515,14 @@ class 
BigQueryCreateExternalTableOperator(GoogleCloudBaseOperator):
         allow_quoted_newlines: bool = False,
         allow_jagged_rows: bool = False,
         gcp_conn_id: str = "google_cloud_default",
-        bigquery_conn_id: str | None = None,
         google_cloud_storage_conn_id: str = "google_cloud_default",
-        delegate_to: str | None = None,
         src_fmt_configs: dict | None = None,
         labels: dict | None = None,
         encryption_configuration: dict | None = None,
         location: str | None = None,
         impersonation_chain: str | Sequence[str] | None = None,
+        delegate_to: str | None = None,
+        bigquery_conn_id: str | None = None,
         **kwargs,
     ) -> None:
         if bigquery_conn_id:
@@ -1721,8 +1746,8 @@ class 
BigQueryDeleteDatasetOperator(GoogleCloudBaseOperator):
         project_id: str | None = None,
         delete_contents: bool = False,
         gcp_conn_id: str = "google_cloud_default",
-        delegate_to: str | None = None,
         impersonation_chain: str | Sequence[str] | None = None,
+        delegate_to: str | None = None,
         **kwargs,
     ) -> None:
         self.dataset_id = dataset_id
@@ -1779,7 +1804,9 @@ class 
BigQueryCreateEmptyDatasetOperator(GoogleCloudBaseOperator):
         If set as a sequence, the identities from the list must grant
         Service Account Token Creator IAM role to the directly preceding 
identity, with first
         account from the list granting this role to the originating account 
(templated).
-    :param exists_ok: If ``True``, ignore "already exists" errors when 
creating the dataset.
+    :param if_exists: What should Airflow do if the dataset exists. If set to 
`log`, the TI will be passed to
+        success and an error message will be logged. Set to `ignore` to ignore 
the error, set to `fail` to
+        fail the TI, and set to `skip` to skip it.
         **Example**: ::
 
             create_new_dataset = BigQueryCreateEmptyDatasetOperator(
@@ -1789,6 +1816,7 @@ class 
BigQueryCreateEmptyDatasetOperator(GoogleCloudBaseOperator):
                 gcp_conn_id='_my_gcp_conn_',
                 task_id='newDatasetCreator',
                 dag=dag)
+    :param exists_ok: Deprecated - use `if_exists="ignore"` instead.
     """
 
     template_fields: Sequence[str] = (
@@ -1809,9 +1837,10 @@ class 
BigQueryCreateEmptyDatasetOperator(GoogleCloudBaseOperator):
         dataset_reference: dict | None = None,
         location: str | None = None,
         gcp_conn_id: str = "google_cloud_default",
-        delegate_to: str | None = None,
         impersonation_chain: str | Sequence[str] | None = None,
-        exists_ok: bool = False,
+        if_exists: str = "log",
+        delegate_to: str | None = None,
+        exists_ok: bool | None = None,
         **kwargs,
     ) -> None:
 
@@ -1826,7 +1855,11 @@ class 
BigQueryCreateEmptyDatasetOperator(GoogleCloudBaseOperator):
             )
         self.delegate_to = delegate_to
         self.impersonation_chain = impersonation_chain
-        self.exists_ok = exists_ok
+        if exists_ok is not None:
+            warnings.warn("`exists_ok` parameter is deprecated, please use 
`if_exists`", DeprecationWarning)
+            self.if_exists = IfExistAction.IGNORE if exists_ok else 
IfExistAction.LOG
+        else:
+            self.if_exists = IfExistAction(if_exists)
 
         super().__init__(**kwargs)
 
@@ -1844,7 +1877,7 @@ class 
BigQueryCreateEmptyDatasetOperator(GoogleCloudBaseOperator):
                 dataset_id=self.dataset_id,
                 dataset_reference=self.dataset_reference,
                 location=self.location,
-                exists_ok=self.exists_ok,
+                exists_ok=self.if_exists == IfExistAction.IGNORE,
             )
             BigQueryDatasetLink.persist(
                 context=context,
@@ -1854,7 +1887,13 @@ class 
BigQueryCreateEmptyDatasetOperator(GoogleCloudBaseOperator):
             )
         except Conflict:
             dataset_id = self.dataset_reference.get("datasetReference", 
{}).get("datasetId", self.dataset_id)
-            self.log.info("Dataset %s already exists.", dataset_id)
+            error_msg = f"Dataset {dataset_id} already exists."
+            if self.if_exists == IfExistAction.LOG:
+                self.log.info(error_msg)
+            elif self.if_exists == IfExistAction.FAIL:
+                raise AirflowException(error_msg)
+            else:
+                raise AirflowSkipException(error_msg)
 
 
 class BigQueryGetDatasetOperator(GoogleCloudBaseOperator):
@@ -1897,8 +1936,8 @@ class BigQueryGetDatasetOperator(GoogleCloudBaseOperator):
         dataset_id: str,
         project_id: str | None = None,
         gcp_conn_id: str = "google_cloud_default",
-        delegate_to: str | None = None,
         impersonation_chain: str | Sequence[str] | None = None,
+        delegate_to: str | None = None,
         **kwargs,
     ) -> None:
         self.dataset_id = dataset_id
@@ -1972,8 +2011,8 @@ class 
BigQueryGetDatasetTablesOperator(GoogleCloudBaseOperator):
         project_id: str | None = None,
         max_results: int | None = None,
         gcp_conn_id: str = "google_cloud_default",
-        delegate_to: str | None = None,
         impersonation_chain: str | Sequence[str] | None = None,
+        delegate_to: str | None = None,
         **kwargs,
     ) -> None:
         self.dataset_id = dataset_id
@@ -2045,8 +2084,8 @@ class 
BigQueryPatchDatasetOperator(GoogleCloudBaseOperator):
         dataset_resource: dict,
         project_id: str | None = None,
         gcp_conn_id: str = "google_cloud_default",
-        delegate_to: str | None = None,
         impersonation_chain: str | Sequence[str] | None = None,
+        delegate_to: str | None = None,
         **kwargs,
     ) -> None:
         warnings.warn(
@@ -2133,8 +2172,8 @@ class 
BigQueryUpdateTableOperator(GoogleCloudBaseOperator):
         table_id: str | None = None,
         project_id: str | None = None,
         gcp_conn_id: str = "google_cloud_default",
-        delegate_to: str | None = None,
         impersonation_chain: str | Sequence[str] | None = None,
+        delegate_to: str | None = None,
         **kwargs,
     ) -> None:
         self.dataset_id = dataset_id
@@ -2227,8 +2266,8 @@ class 
BigQueryUpdateDatasetOperator(GoogleCloudBaseOperator):
         dataset_id: str | None = None,
         project_id: str | None = None,
         gcp_conn_id: str = "google_cloud_default",
-        delegate_to: str | None = None,
         impersonation_chain: str | Sequence[str] | None = None,
+        delegate_to: str | None = None,
         **kwargs,
     ) -> None:
         self.dataset_id = dataset_id
@@ -2308,10 +2347,10 @@ class 
BigQueryDeleteTableOperator(GoogleCloudBaseOperator):
         *,
         deletion_dataset_table: str,
         gcp_conn_id: str = "google_cloud_default",
-        delegate_to: str | None = None,
         ignore_if_missing: bool = False,
         location: str | None = None,
         impersonation_chain: str | Sequence[str] | None = None,
+        delegate_to: str | None = None,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
@@ -2385,9 +2424,9 @@ class 
BigQueryUpsertTableOperator(GoogleCloudBaseOperator):
         table_resource: dict,
         project_id: str | None = None,
         gcp_conn_id: str = "google_cloud_default",
-        delegate_to: str | None = None,
         location: str | None = None,
         impersonation_chain: str | Sequence[str] | None = None,
+        delegate_to: str | None = None,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
@@ -2496,8 +2535,8 @@ class 
BigQueryUpdateTableSchemaOperator(GoogleCloudBaseOperator):
         include_policy_tags: bool = False,
         project_id: str | None = None,
         gcp_conn_id: str = "google_cloud_default",
-        delegate_to: str | None = None,
         impersonation_chain: str | Sequence[str] | None = None,
+        delegate_to: str | None = None,
         **kwargs,
     ) -> None:
         self.schema_fields_updates = schema_fields_updates
@@ -2616,12 +2655,12 @@ class 
BigQueryInsertJobOperator(GoogleCloudBaseOperator):
         force_rerun: bool = True,
         reattach_states: set[str] | None = None,
         gcp_conn_id: str = "google_cloud_default",
-        delegate_to: str | None = None,
         impersonation_chain: str | Sequence[str] | None = None,
         cancel_on_kill: bool = True,
         result_retry: Retry = DEFAULT_RETRY,
         result_timeout: float | None = None,
         deferrable: bool = False,
+        delegate_to: str | None = None,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py 
b/tests/providers/google/cloud/operators/test_bigquery.py
index d814c894bb..b14b43e108 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -26,7 +26,7 @@ import pytest
 from google.cloud.bigquery import DEFAULT_RETRY
 from google.cloud.exceptions import Conflict
 
-from airflow.exceptions import AirflowException, AirflowTaskTimeout, 
TaskDeferred
+from airflow.exceptions import AirflowException, AirflowSkipException, 
AirflowTaskTimeout, TaskDeferred
 from airflow.models import DAG
 from airflow.models.dagrun import DagRun
 from airflow.models.taskinstance import TaskInstance
@@ -201,6 +201,41 @@ class 
TestBigQueryCreateEmptyTableOperator(unittest.TestCase):
         )
 
 
[email protected](
+    "if_exists, is_conflict, expected_error, log_msg",
+    [
+        ("ignore", False, None, None),
+        ("log", False, None, None),
+        ("log", True, None, f"Table {TEST_DATASET}.{TEST_TABLE_ID} already 
exists."),
+        ("fail", False, None, None),
+        ("fail", True, AirflowException, None),
+        ("skip", False, None, None),
+        ("skip", True, AirflowSkipException, None),
+    ],
+)
[email protected]("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+def test_create_existing_table(mock_hook, caplog, if_exists, is_conflict, 
expected_error, log_msg):
+    operator = BigQueryCreateEmptyTableOperator(
+        task_id=TASK_ID,
+        dataset_id=TEST_DATASET,
+        project_id=TEST_GCP_PROJECT_ID,
+        table_id=TEST_TABLE_ID,
+        view=VIEW_DEFINITION,
+        if_exists=if_exists,
+    )
+    if is_conflict:
+        mock_hook.return_value.create_empty_table.side_effect = Conflict("any")
+    else:
+        mock_hook.return_value.create_empty_table.side_effect = None
+    if expected_error is not None:
+        with pytest.raises(expected_error):
+            operator.execute(context=MagicMock())
+    else:
+        operator.execute(context=MagicMock())
+    if log_msg is not None:
+        assert log_msg in caplog.text
+
+
 class TestBigQueryCreateExternalTableOperator(unittest.TestCase):
     
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
     def test_execute(self, mock_hook):
@@ -288,6 +323,40 @@ class 
TestBigQueryCreateEmptyDatasetOperator(unittest.TestCase):
         )
 
 
[email protected](
+    "if_exists, is_conflict, expected_error, log_msg",
+    [
+        ("ignore", False, None, None),
+        ("log", False, None, None),
+        ("log", True, None, f"Dataset {TEST_DATASET} already exists."),
+        ("fail", False, None, None),
+        ("fail", True, AirflowException, None),
+        ("skip", False, None, None),
+        ("skip", True, AirflowSkipException, None),
+    ],
+)
[email protected]("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+def test_create_empty_dataset(mock_hook, caplog, if_exists, is_conflict, 
expected_error, log_msg):
+    operator = BigQueryCreateEmptyDatasetOperator(
+        task_id=TASK_ID,
+        dataset_id=TEST_DATASET,
+        project_id=TEST_GCP_PROJECT_ID,
+        location=TEST_DATASET_LOCATION,
+        if_exists=if_exists,
+    )
+    if is_conflict:
+        mock_hook.return_value.create_empty_dataset.side_effect = 
Conflict("any")
+    else:
+        mock_hook.return_value.create_empty_dataset.side_effect = None
+    if expected_error is not None:
+        with pytest.raises(expected_error):
+            operator.execute(context=MagicMock())
+    else:
+        operator.execute(context=MagicMock())
+    if log_msg is not None:
+        assert log_msg in caplog.text
+
+
 class TestBigQueryGetDatasetOperator(unittest.TestCase):
     
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
     def test_execute(self, mock_hook):

Reply via email to