This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e68d4a7764 implement gcs_schema_object for 
BigQueryCreateExternalTableOperator (#30961)
e68d4a7764 is described below

commit e68d4a77645a5f0c199d5618912a99be71281920
Author: Attila Szombati <[email protected]>
AuthorDate: Mon May 8 22:19:53 2023 +0200

    implement gcs_schema_object for BigQueryCreateExternalTableOperator (#30961)
    
    
    ---------
    
    Co-authored-by: szombatiattila <[email protected]>
---
 airflow/providers/google/cloud/operators/bigquery.py    | 13 +++++++++++--
 tests/providers/google/cloud/operators/test_bigquery.py |  1 +
 2 files changed, 12 insertions(+), 2 deletions(-)

diff --git a/airflow/providers/google/cloud/operators/bigquery.py 
b/airflow/providers/google/cloud/operators/bigquery.py
index 45c2e36c46..22150a6221 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -55,7 +55,6 @@ if TYPE_CHECKING:
     from airflow.models.taskinstancekey import TaskInstanceKey
     from airflow.utils.context import Context
 
-
 BIGQUERY_JOB_DETAILS_LINK_FMT = 
"https://console.cloud.google.com/bigquery?j={job_id}";
 
 
@@ -1434,6 +1433,8 @@ class 
BigQueryCreateExternalTableOperator(GoogleCloudBaseOperator):
         If provided all other parameters are ignored. External schema from 
object will be resolved.
     :param schema_object: If set, a GCS object path pointing to a .json file 
that
         contains the schema for the table. (templated)
+    :param gcs_schema_bucket: GCS bucket name where the schema JSON is stored 
(templated).
+        The default value is self.bucket.
     :param source_format: File format of the data.
     :param autodetect: Try to detect schema and format options automatically.
         The schema_fields and schema_object options will be honored when 
specified explicitly.
@@ -1481,6 +1482,7 @@ class 
BigQueryCreateExternalTableOperator(GoogleCloudBaseOperator):
         "bucket",
         "source_objects",
         "schema_object",
+        "gcs_schema_bucket",
         "destination_project_dataset_table",
         "labels",
         "table_resource",
@@ -1499,6 +1501,7 @@ class 
BigQueryCreateExternalTableOperator(GoogleCloudBaseOperator):
         table_resource: dict[str, Any] | None = None,
         schema_fields: list | None = None,
         schema_object: str | None = None,
+        gcs_schema_bucket: str | None = None,
         source_format: str | None = None,
         autodetect: bool = False,
         compression: str | None = None,
@@ -1557,6 +1560,8 @@ class 
BigQueryCreateExternalTableOperator(GoogleCloudBaseOperator):
             )
             if not bucket:
                 raise ValueError("`bucket` is required when not using 
`table_resource`.")
+            if not gcs_schema_bucket:
+                gcs_schema_bucket = bucket
             if not source_objects:
                 raise ValueError("`source_objects` is required when not using 
`table_resource`.")
             if not source_format:
@@ -1574,6 +1579,7 @@ class 
BigQueryCreateExternalTableOperator(GoogleCloudBaseOperator):
             self.bucket = bucket
             self.source_objects = source_objects
             self.schema_object = schema_object
+            self.gcs_schema_bucket = gcs_schema_bucket
             self.destination_project_dataset_table = 
destination_project_dataset_table
             self.schema_fields = schema_fields
             self.source_format = source_format
@@ -1586,6 +1592,7 @@ class 
BigQueryCreateExternalTableOperator(GoogleCloudBaseOperator):
             self.bucket = ""
             self.source_objects = []
             self.schema_object = None
+            self.gcs_schema_bucket = ""
             self.destination_project_dataset_table = ""
 
         if table_resource and kwargs_passed:
@@ -1629,7 +1636,9 @@ class 
BigQueryCreateExternalTableOperator(GoogleCloudBaseOperator):
                 gcp_conn_id=self.google_cloud_storage_conn_id,
                 impersonation_chain=self.impersonation_chain,
             )
-            schema_fields = json.loads(gcs_hook.download(self.bucket, 
self.schema_object).decode("utf-8"))
+            schema_fields = json.loads(
+                gcs_hook.download(self.gcs_schema_bucket, 
self.schema_object).decode("utf-8")
+            )
         else:
             schema_fields = self.schema_fields
 
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py 
b/tests/providers/google/cloud/operators/test_bigquery.py
index e8d51f0b93..d5fafb6995 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -237,6 +237,7 @@ class TestBigQueryCreateExternalTableOperator:
             
destination_project_dataset_table=f"{TEST_GCP_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}",
             schema_fields=[],
             bucket=TEST_GCS_BUCKET,
+            gcs_schema_bucket=TEST_GCS_BUCKET,
             source_objects=TEST_GCS_DATA,
             source_format=TEST_SOURCE_FORMAT,
             autodetect=True,

Reply via email to