This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 46fa5a2743 Fix BigQueryCreateExternalTableOperator when using a foramt 
different to CSV (#33540)
46fa5a2743 is described below

commit 46fa5a2743c0c864f5282abd6055c5418585955b
Author: Hussein Awala <[email protected]>
AuthorDate: Mon Aug 21 00:13:28 2023 +0200

    Fix BigQueryCreateExternalTableOperator when using a foramt different to 
CSV (#33540)
    
    * Fix BigQueryCreateExternalTableOperator when using a foramt different to 
CSV
    
    * fix python object name
---
 .../providers/google/cloud/operators/bigquery.py   | 31 +++++++-----
 .../google/cloud/operators/test_bigquery.py        | 59 +++++++++++++++++++---
 2 files changed, 69 insertions(+), 21 deletions(-)

diff --git a/airflow/providers/google/cloud/operators/bigquery.py 
b/airflow/providers/google/cloud/operators/bigquery.py
index baedeb3347..75ae73b6f9 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -1797,6 +1797,22 @@ class 
BigQueryCreateExternalTableOperator(GoogleCloudBaseOperator):
             default_project_id=bq_hook.project_id or "",
         )
 
+        external_data_configuration = {
+            "source_uris": source_uris,
+            "source_format": self.source_format,
+            "autodetect": self.autodetect,
+            "compression": self.compression,
+            "maxBadRecords": self.max_bad_records,
+        }
+        if self.source_format == "CSV":
+            external_data_configuration["csvOptions"] = {
+                "fieldDelimiter": self.field_delimiter,
+                "skipLeadingRows": self.skip_leading_rows,
+                "quote": self.quote_character,
+                "allowQuotedNewlines": self.allow_quoted_newlines,
+                "allowJaggedRows": self.allow_jagged_rows,
+            }
+
         table_resource = {
             "tableReference": {
                 "projectId": project_id,
@@ -1805,20 +1821,7 @@ class 
BigQueryCreateExternalTableOperator(GoogleCloudBaseOperator):
             },
             "labels": self.labels,
             "schema": {"fields": schema_fields},
-            "externalDataConfiguration": {
-                "source_uris": source_uris,
-                "source_format": self.source_format,
-                "maxBadRecords": self.max_bad_records,
-                "autodetect": self.autodetect,
-                "compression": self.compression,
-                "csvOptions": {
-                    "fieldDelimiter": self.field_delimiter,
-                    "skipLeadingRows": self.skip_leading_rows,
-                    "quote": self.quote_character,
-                    "allowQuotedNewlines": self.allow_quoted_newlines,
-                    "allowJaggedRows": self.allow_jagged_rows,
-                },
-            },
+            "externalDataConfiguration": external_data_configuration,
             "location": self.location,
             "encryptionConfiguration": self.encryption_configuration,
         }
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py 
b/tests/providers/google/cloud/operators/test_bigquery.py
index d17f5498e2..4c5cd6f717 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -77,8 +77,10 @@ TEST_JOB_PROJECT_ID = "test-job-project"
 TEST_DELETE_CONTENTS = True
 TEST_TABLE_ID = "test-table-id"
 TEST_GCS_BUCKET = "test-bucket"
-TEST_GCS_DATA = ["dir1/*.csv"]
-TEST_SOURCE_FORMAT = "CSV"
+TEST_GCS_CSV_DATA = ["dir1/*.csv"]
+TEST_SOURCE_CSV_FORMAT = "CSV"
+TEST_GCS_PARQUET_DATA = ["dir1/*.parquet"]
+TEST_SOURCE_PARQUET_FORMAT = "PARQUET"
 DEFAULT_DATE = datetime(2015, 1, 1)
 TEST_DAG_ID = "test-bigquery-operators"
 TEST_TABLE_RESOURCES = {"tableReference": {"tableId": TEST_TABLE_ID}, 
"expirationTime": 1234567}
@@ -246,15 +248,15 @@ class TestBigQueryCreateEmptyTableOperator:
 
 class TestBigQueryCreateExternalTableOperator:
     
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
-    def test_execute(self, mock_hook):
+    def test_execute_with_csv_format(self, mock_hook):
         operator = BigQueryCreateExternalTableOperator(
             task_id=TASK_ID,
             
destination_project_dataset_table=f"{TEST_GCP_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}",
             schema_fields=[],
             bucket=TEST_GCS_BUCKET,
             gcs_schema_bucket=TEST_GCS_BUCKET,
-            source_objects=TEST_GCS_DATA,
-            source_format=TEST_SOURCE_FORMAT,
+            source_objects=TEST_GCS_CSV_DATA,
+            source_format=TEST_SOURCE_CSV_FORMAT,
             autodetect=True,
         )
 
@@ -276,9 +278,9 @@ class TestBigQueryCreateExternalTableOperator:
                 "schema": {"fields": []},
                 "externalDataConfiguration": {
                     "source_uris": [
-                        f"gs://{TEST_GCS_BUCKET}/{source_object}" for 
source_object in TEST_GCS_DATA
+                        f"gs://{TEST_GCS_BUCKET}/{source_object}" for 
source_object in TEST_GCS_CSV_DATA
                     ],
-                    "source_format": TEST_SOURCE_FORMAT,
+                    "source_format": TEST_SOURCE_CSV_FORMAT,
                     "maxBadRecords": 0,
                     "autodetect": True,
                     "compression": "NONE",
@@ -295,6 +297,49 @@ class TestBigQueryCreateExternalTableOperator:
             }
         )
 
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    def test_execute_with_parquet_format(self, mock_hook):
+        operator = BigQueryCreateExternalTableOperator(
+            task_id=TASK_ID,
+            
destination_project_dataset_table=f"{TEST_GCP_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}",
+            schema_fields=[],
+            bucket=TEST_GCS_BUCKET,
+            gcs_schema_bucket=TEST_GCS_BUCKET,
+            source_objects=TEST_GCS_PARQUET_DATA,
+            source_format=TEST_SOURCE_PARQUET_FORMAT,
+            autodetect=True,
+        )
+
+        mock_hook.return_value.split_tablename.return_value = (
+            TEST_GCP_PROJECT_ID,
+            TEST_DATASET,
+            TEST_TABLE_ID,
+        )
+
+        operator.execute(context=MagicMock())
+        mock_hook.return_value.create_empty_table.assert_called_once_with(
+            table_resource={
+                "tableReference": {
+                    "projectId": TEST_GCP_PROJECT_ID,
+                    "datasetId": TEST_DATASET,
+                    "tableId": TEST_TABLE_ID,
+                },
+                "labels": None,
+                "schema": {"fields": []},
+                "externalDataConfiguration": {
+                    "source_uris": [
+                        f"gs://{TEST_GCS_BUCKET}/{source_object}" for 
source_object in TEST_GCS_PARQUET_DATA
+                    ],
+                    "source_format": TEST_SOURCE_PARQUET_FORMAT,
+                    "maxBadRecords": 0,
+                    "autodetect": True,
+                    "compression": "NONE",
+                },
+                "location": None,
+                "encryptionConfiguration": None,
+            }
+        )
+
 
 class TestBigQueryDeleteDatasetOperator:
     
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")

Reply via email to