This is an automated email from the ASF dual-hosted git repository.
shahar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 469693a2bb8 Add support for `parquetOptions` in GCSToBigQueryOperator
(#60876)
469693a2bb8 is described below
commit 469693a2bb847b8e6129fd611466ab02722e8667
Author: Miriam Lauter <[email protected]>
AuthorDate: Sat Jan 31 02:28:55 2026 -0500
Add support for `parquetOptions` in GCSToBigQueryOperator (#60876)
---
.../google/cloud/transfers/gcs_to_bigquery.py | 56 +++++-
.../google/cloud/transfers/test_gcs_to_bigquery.py | 206 +++++++++++++++++++++
2 files changed, 255 insertions(+), 7 deletions(-)
diff --git
a/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
b/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
index 5c0ab5572d0..ffc88e77e21 100644
---
a/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
+++
b/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
@@ -544,7 +544,11 @@ class GCSToBigQueryOperator(BaseOperator):
"allowJaggedRows": self.allow_jagged_rows,
"encoding": self.encoding,
}
- src_fmt_to_param_mapping = {"CSV": "csvOptions", "GOOGLE_SHEETS":
"googleSheetsOptions"}
+ src_fmt_to_param_mapping = {
+ "CSV": "csvOptions",
+ "GOOGLE_SHEETS": "googleSheetsOptions",
+ "PARQUET": "parquetOptions",
+ }
src_fmt_to_configs_mapping = {
"csvOptions": [
"allowJaggedRows",
@@ -557,6 +561,7 @@ class GCSToBigQueryOperator(BaseOperator):
"columnNameCharacterMap",
],
"googleSheetsOptions": ["skipLeadingRows"],
+ "parquetOptions": ["enumAsString", "enableListInference",
"mapTargetType"],
}
if self.source_format in src_fmt_to_param_mapping:
valid_configs =
src_fmt_to_configs_mapping[src_fmt_to_param_mapping[self.source_format]]
@@ -687,8 +692,18 @@ class GCSToBigQueryOperator(BaseOperator):
"ORC": ["autodetect"],
}
+ # Some source formats have nested configuration options which are not
available
+ # at the top level of the load configuration.
+ src_fmt_to_param_mapping = {"PARQUET": "parquetOptions"}
+ src_fmt_to_nested_configs_mapping = {
+ "parquetOptions": ["enumAsString", "enableListInference",
"mapTargetType"],
+ }
+
valid_configs = src_fmt_to_configs_mapping[self.source_format]
+ src_fmt_param = src_fmt_to_param_mapping.get(self.source_format)
+ valid_nested_configs =
src_fmt_to_nested_configs_mapping[src_fmt_param] if src_fmt_param else None
+
# if following fields are not specified in src_fmt_configs,
# honor the top-level params for backward-compatibility
backward_compatibility_configs = {
@@ -701,7 +716,12 @@ class GCSToBigQueryOperator(BaseOperator):
}
self.src_fmt_configs = self._validate_src_fmt_configs(
- self.source_format, self.src_fmt_configs, valid_configs,
backward_compatibility_configs
+ self.source_format,
+ self.src_fmt_configs,
+ valid_configs,
+ backward_compatibility_configs,
+ src_fmt_param,
+ valid_nested_configs,
)
self.configuration["load"].update(self.src_fmt_configs)
@@ -716,29 +736,51 @@ class GCSToBigQueryOperator(BaseOperator):
src_fmt_configs: dict,
valid_configs: list[str],
backward_compatibility_configs: dict | None = None,
+ src_fmt_param: str | None = None,
+ valid_nested_configs: list[str] | None = None,
) -> dict:
"""
- Validate the given src_fmt_configs against a valid configuration for
the source format.
+ Validate and format the given src_fmt_configs against a valid
configuration for the source format.
Adds the backward compatibility config to the src_fmt_configs.
+ Adds nested source format configurations if valid_nested_configs is
provided.
+
:param source_format: File format to export.
:param src_fmt_configs: Configure optional fields specific to the
source format.
:param valid_configs: Valid configuration specific to the source format
:param backward_compatibility_configs: The top-level params for
backward-compatibility
+ :param src_fmt_param: The source format parameter for nested
configurations.
+ Required when valid_nested_configs is provided.
+ :param valid_nested_configs: Valid nested configuration specific to
the source format.
"""
+ valid_src_fmt_configs = {}
+
if backward_compatibility_configs is None:
backward_compatibility_configs = {}
for k, v in backward_compatibility_configs.items():
if k not in src_fmt_configs and k in valid_configs:
- src_fmt_configs[k] = v
+ valid_src_fmt_configs[k] = v
+
+ if valid_nested_configs is None:
+ valid_nested_configs = []
+
+ if valid_nested_configs:
+ if src_fmt_param is None:
+ raise ValueError("src_fmt_param is required when
valid_nested_configs is provided.")
- for k in src_fmt_configs:
- if k not in valid_configs:
+ valid_src_fmt_configs[src_fmt_param] = {}
+
+ for k, v in src_fmt_configs.items():
+ if k in valid_configs:
+ valid_src_fmt_configs[k] = v
+ elif k in valid_nested_configs:
+ valid_src_fmt_configs[src_fmt_param][k] = v
+ else:
raise ValueError(f"{k} is not a valid src_fmt_configs for type
{source_format}.")
- return src_fmt_configs
+ return valid_src_fmt_configs
def _cleanse_time_partitioning(
self, destination_dataset_table: str | None, time_partitioning_in:
dict | None
diff --git
a/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py
b/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py
index ba83ef96c68..8d059692787 100644
--- a/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py
@@ -1682,6 +1682,212 @@ class TestGCSToBigQueryOperator:
},
)
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def test_external_table_should_accept_parquet_format(self, hook):
+ hook.return_value.insert_job.side_effect = [
+ MagicMock(job_id=REAL_JOB_ID, error_result=False),
+ REAL_JOB_ID,
+ ]
+ hook.return_value.generate_job_id.return_value = REAL_JOB_ID
+ hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
+
+ operator = GCSToBigQueryOperator(
+ task_id=TASK_ID,
+ bucket=TEST_BUCKET,
+ source_objects=TEST_SOURCE_OBJECTS,
+ destination_project_dataset_table=TEST_EXPLICIT_DEST,
+ schema_fields=SCHEMA_FIELDS,
+ write_disposition=WRITE_DISPOSITION,
+ external_table=True,
+ project_id=JOB_PROJECT_ID,
+ source_format="PARQUET",
+ )
+
+ operator.execute(context=MagicMock())
+
+ hook.return_value.create_table.assert_called_once_with(
+ exists_ok=True,
+ location=None,
+ project_id=JOB_PROJECT_ID,
+ dataset_id=DATASET,
+ table_id=TABLE,
+ table_resource={
+ "tableReference": {
+ "projectId": PROJECT_ID,
+ "datasetId": DATASET,
+ "tableId": TABLE,
+ },
+ "externalDataConfiguration": {
+ "autodetect": True,
+ "sourceFormat": "PARQUET",
+ "sourceUris":
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
+ "compression": "NONE",
+ "ignoreUnknownValues": False,
+ "schema": {"fields": SCHEMA_FIELDS},
+ "parquetOptions": {},
+ },
+ "labels": {},
+ },
+ )
+
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def test_external_table_should_accept_parquet_format_and_options(self,
hook):
+ hook.return_value.insert_job.side_effect = [
+ MagicMock(job_id=REAL_JOB_ID, error_result=False),
+ REAL_JOB_ID,
+ ]
+ hook.return_value.generate_job_id.return_value = REAL_JOB_ID
+ hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
+
+ operator = GCSToBigQueryOperator(
+ task_id=TASK_ID,
+ bucket=TEST_BUCKET,
+ source_objects=TEST_SOURCE_OBJECTS,
+ destination_project_dataset_table=TEST_EXPLICIT_DEST,
+ schema_fields=SCHEMA_FIELDS,
+ write_disposition=WRITE_DISPOSITION,
+ external_table=True,
+ project_id=JOB_PROJECT_ID,
+ source_format="PARQUET",
+ src_fmt_configs={
+ "enableListInference": True,
+ },
+ )
+
+ operator.execute(context=MagicMock())
+
+ hook.return_value.create_table.assert_called_once_with(
+ exists_ok=True,
+ location=None,
+ project_id=JOB_PROJECT_ID,
+ dataset_id=DATASET,
+ table_id=TABLE,
+ table_resource={
+ "tableReference": {
+ "projectId": PROJECT_ID,
+ "datasetId": DATASET,
+ "tableId": TABLE,
+ },
+ "externalDataConfiguration": {
+ "autodetect": True,
+ "sourceFormat": "PARQUET",
+ "sourceUris":
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
+ "compression": "NONE",
+ "ignoreUnknownValues": False,
+ "schema": {"fields": SCHEMA_FIELDS},
+ "parquetOptions": {
+ "enableListInference": True,
+ },
+ },
+ "labels": {},
+ },
+ )
+
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def test_without_external_table_should_accept_parquet_format(self, hook):
+ hook.return_value.insert_job.side_effect = [
+ MagicMock(job_id=REAL_JOB_ID, error_result=False),
+ REAL_JOB_ID,
+ ]
+ hook.return_value.generate_job_id.return_value = REAL_JOB_ID
+ hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
+ operator = GCSToBigQueryOperator(
+ task_id=TASK_ID,
+ bucket=TEST_BUCKET,
+ source_objects=TEST_SOURCE_OBJECTS,
+ write_disposition=WRITE_DISPOSITION,
+ destination_project_dataset_table=TEST_EXPLICIT_DEST,
+ external_table=False,
+ project_id=JOB_PROJECT_ID,
+ source_format="PARQUET",
+ )
+
+ operator.execute(context=MagicMock())
+
+ calls = [
+ call(
+ configuration={
+ "load": {
+ "autodetect": True,
+ "createDisposition": "CREATE_IF_NEEDED",
+ "destinationTable": {
+ "projectId": "test-project",
+ "datasetId": "dataset",
+ "tableId": "table",
+ },
+ "sourceFormat": "PARQUET",
+ "sourceUris":
["gs://test-bucket/test/objects/test.csv"],
+ "writeDisposition": "WRITE_TRUNCATE",
+ "ignoreUnknownValues": False,
+ "parquetOptions": {},
+ }
+ },
+ job_id=REAL_JOB_ID,
+ location=None,
+ nowait=True,
+ project_id=JOB_PROJECT_ID,
+ retry=DEFAULT_RETRY,
+ timeout=None,
+ )
+ ]
+
+ hook.return_value.insert_job.assert_has_calls(calls)
+
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def
test_without_external_table_should_accept_parquet_format_and_options(self,
hook):
+ hook.return_value.insert_job.side_effect = [
+ MagicMock(job_id=REAL_JOB_ID, error_result=False),
+ REAL_JOB_ID,
+ ]
+ hook.return_value.generate_job_id.return_value = REAL_JOB_ID
+ hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
+ operator = GCSToBigQueryOperator(
+ task_id=TASK_ID,
+ bucket=TEST_BUCKET,
+ source_objects=TEST_SOURCE_OBJECTS,
+ write_disposition=WRITE_DISPOSITION,
+ destination_project_dataset_table=TEST_EXPLICIT_DEST,
+ external_table=False,
+ project_id=JOB_PROJECT_ID,
+ source_format="PARQUET",
+ src_fmt_configs={
+ "enableListInference": True,
+ },
+ )
+
+ operator.execute(context=MagicMock())
+
+ calls = [
+ call(
+ configuration={
+ "load": {
+ "autodetect": True,
+ "createDisposition": "CREATE_IF_NEEDED",
+ "destinationTable": {
+ "projectId": "test-project",
+ "datasetId": "dataset",
+ "tableId": "table",
+ },
+ "sourceFormat": "PARQUET",
+ "sourceUris":
["gs://test-bucket/test/objects/test.csv"],
+ "writeDisposition": "WRITE_TRUNCATE",
+ "ignoreUnknownValues": False,
+ "parquetOptions": {
+ "enableListInference": True,
+ },
+ }
+ },
+ job_id=REAL_JOB_ID,
+ location=None,
+ nowait=True,
+ project_id=JOB_PROJECT_ID,
+ retry=DEFAULT_RETRY,
+ timeout=None,
+ )
+ ]
+
+ hook.return_value.insert_job.assert_has_calls(calls)
+
@pytest.fixture
def create_task_instance(create_task_instance_of_operator, session):