This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 46fa5a2743 Fix BigQueryCreateExternalTableOperator when using a foramt
different to CSV (#33540)
46fa5a2743 is described below
commit 46fa5a2743c0c864f5282abd6055c5418585955b
Author: Hussein Awala <[email protected]>
AuthorDate: Mon Aug 21 00:13:28 2023 +0200
Fix BigQueryCreateExternalTableOperator when using a foramt different to
CSV (#33540)
* Fix BigQueryCreateExternalTableOperator when using a foramt different to
CSV
* fix python object name
---
.../providers/google/cloud/operators/bigquery.py | 31 +++++++-----
.../google/cloud/operators/test_bigquery.py | 59 +++++++++++++++++++---
2 files changed, 69 insertions(+), 21 deletions(-)
diff --git a/airflow/providers/google/cloud/operators/bigquery.py
b/airflow/providers/google/cloud/operators/bigquery.py
index baedeb3347..75ae73b6f9 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -1797,6 +1797,22 @@ class
BigQueryCreateExternalTableOperator(GoogleCloudBaseOperator):
default_project_id=bq_hook.project_id or "",
)
+ external_data_configuration = {
+ "source_uris": source_uris,
+ "source_format": self.source_format,
+ "autodetect": self.autodetect,
+ "compression": self.compression,
+ "maxBadRecords": self.max_bad_records,
+ }
+ if self.source_format == "CSV":
+ external_data_configuration["csvOptions"] = {
+ "fieldDelimiter": self.field_delimiter,
+ "skipLeadingRows": self.skip_leading_rows,
+ "quote": self.quote_character,
+ "allowQuotedNewlines": self.allow_quoted_newlines,
+ "allowJaggedRows": self.allow_jagged_rows,
+ }
+
table_resource = {
"tableReference": {
"projectId": project_id,
@@ -1805,20 +1821,7 @@ class
BigQueryCreateExternalTableOperator(GoogleCloudBaseOperator):
},
"labels": self.labels,
"schema": {"fields": schema_fields},
- "externalDataConfiguration": {
- "source_uris": source_uris,
- "source_format": self.source_format,
- "maxBadRecords": self.max_bad_records,
- "autodetect": self.autodetect,
- "compression": self.compression,
- "csvOptions": {
- "fieldDelimiter": self.field_delimiter,
- "skipLeadingRows": self.skip_leading_rows,
- "quote": self.quote_character,
- "allowQuotedNewlines": self.allow_quoted_newlines,
- "allowJaggedRows": self.allow_jagged_rows,
- },
- },
+ "externalDataConfiguration": external_data_configuration,
"location": self.location,
"encryptionConfiguration": self.encryption_configuration,
}
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py
b/tests/providers/google/cloud/operators/test_bigquery.py
index d17f5498e2..4c5cd6f717 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -77,8 +77,10 @@ TEST_JOB_PROJECT_ID = "test-job-project"
TEST_DELETE_CONTENTS = True
TEST_TABLE_ID = "test-table-id"
TEST_GCS_BUCKET = "test-bucket"
-TEST_GCS_DATA = ["dir1/*.csv"]
-TEST_SOURCE_FORMAT = "CSV"
+TEST_GCS_CSV_DATA = ["dir1/*.csv"]
+TEST_SOURCE_CSV_FORMAT = "CSV"
+TEST_GCS_PARQUET_DATA = ["dir1/*.parquet"]
+TEST_SOURCE_PARQUET_FORMAT = "PARQUET"
DEFAULT_DATE = datetime(2015, 1, 1)
TEST_DAG_ID = "test-bigquery-operators"
TEST_TABLE_RESOURCES = {"tableReference": {"tableId": TEST_TABLE_ID},
"expirationTime": 1234567}
@@ -246,15 +248,15 @@ class TestBigQueryCreateEmptyTableOperator:
class TestBigQueryCreateExternalTableOperator:
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
- def test_execute(self, mock_hook):
+ def test_execute_with_csv_format(self, mock_hook):
operator = BigQueryCreateExternalTableOperator(
task_id=TASK_ID,
destination_project_dataset_table=f"{TEST_GCP_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}",
schema_fields=[],
bucket=TEST_GCS_BUCKET,
gcs_schema_bucket=TEST_GCS_BUCKET,
- source_objects=TEST_GCS_DATA,
- source_format=TEST_SOURCE_FORMAT,
+ source_objects=TEST_GCS_CSV_DATA,
+ source_format=TEST_SOURCE_CSV_FORMAT,
autodetect=True,
)
@@ -276,9 +278,9 @@ class TestBigQueryCreateExternalTableOperator:
"schema": {"fields": []},
"externalDataConfiguration": {
"source_uris": [
- f"gs://{TEST_GCS_BUCKET}/{source_object}" for
source_object in TEST_GCS_DATA
+ f"gs://{TEST_GCS_BUCKET}/{source_object}" for
source_object in TEST_GCS_CSV_DATA
],
- "source_format": TEST_SOURCE_FORMAT,
+ "source_format": TEST_SOURCE_CSV_FORMAT,
"maxBadRecords": 0,
"autodetect": True,
"compression": "NONE",
@@ -295,6 +297,49 @@ class TestBigQueryCreateExternalTableOperator:
}
)
+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+ def test_execute_with_parquet_format(self, mock_hook):
+ operator = BigQueryCreateExternalTableOperator(
+ task_id=TASK_ID,
+
destination_project_dataset_table=f"{TEST_GCP_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}",
+ schema_fields=[],
+ bucket=TEST_GCS_BUCKET,
+ gcs_schema_bucket=TEST_GCS_BUCKET,
+ source_objects=TEST_GCS_PARQUET_DATA,
+ source_format=TEST_SOURCE_PARQUET_FORMAT,
+ autodetect=True,
+ )
+
+ mock_hook.return_value.split_tablename.return_value = (
+ TEST_GCP_PROJECT_ID,
+ TEST_DATASET,
+ TEST_TABLE_ID,
+ )
+
+ operator.execute(context=MagicMock())
+ mock_hook.return_value.create_empty_table.assert_called_once_with(
+ table_resource={
+ "tableReference": {
+ "projectId": TEST_GCP_PROJECT_ID,
+ "datasetId": TEST_DATASET,
+ "tableId": TEST_TABLE_ID,
+ },
+ "labels": None,
+ "schema": {"fields": []},
+ "externalDataConfiguration": {
+ "source_uris": [
+ f"gs://{TEST_GCS_BUCKET}/{source_object}" for
source_object in TEST_GCS_PARQUET_DATA
+ ],
+ "source_format": TEST_SOURCE_PARQUET_FORMAT,
+ "maxBadRecords": 0,
+ "autodetect": True,
+ "compression": "NONE",
+ },
+ "location": None,
+ "encryptionConfiguration": None,
+ }
+ )
+
class TestBigQueryDeleteDatasetOperator:
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")