This is an automated email from the ASF dual-hosted git repository.

shahar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 469693a2bb8 Add support for `parquetOptions` in GCSToBigQueryOperator  
(#60876)
469693a2bb8 is described below

commit 469693a2bb847b8e6129fd611466ab02722e8667
Author: Miriam Lauter <[email protected]>
AuthorDate: Sat Jan 31 02:28:55 2026 -0500

    Add support for `parquetOptions` in GCSToBigQueryOperator  (#60876)
---
 .../google/cloud/transfers/gcs_to_bigquery.py      |  56 +++++-
 .../google/cloud/transfers/test_gcs_to_bigquery.py | 206 +++++++++++++++++++++
 2 files changed, 255 insertions(+), 7 deletions(-)

diff --git 
a/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
 
b/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
index 5c0ab5572d0..ffc88e77e21 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
@@ -544,7 +544,11 @@ class GCSToBigQueryOperator(BaseOperator):
             "allowJaggedRows": self.allow_jagged_rows,
             "encoding": self.encoding,
         }
-        src_fmt_to_param_mapping = {"CSV": "csvOptions", "GOOGLE_SHEETS": 
"googleSheetsOptions"}
+        src_fmt_to_param_mapping = {
+            "CSV": "csvOptions",
+            "GOOGLE_SHEETS": "googleSheetsOptions",
+            "PARQUET": "parquetOptions",
+        }
         src_fmt_to_configs_mapping = {
             "csvOptions": [
                 "allowJaggedRows",
@@ -557,6 +561,7 @@ class GCSToBigQueryOperator(BaseOperator):
                 "columnNameCharacterMap",
             ],
             "googleSheetsOptions": ["skipLeadingRows"],
+            "parquetOptions": ["enumAsString", "enableListInference", 
"mapTargetType"],
         }
         if self.source_format in src_fmt_to_param_mapping:
             valid_configs = 
src_fmt_to_configs_mapping[src_fmt_to_param_mapping[self.source_format]]
@@ -687,8 +692,18 @@ class GCSToBigQueryOperator(BaseOperator):
             "ORC": ["autodetect"],
         }
 
+        # Some source formats have nested configuration options which are not 
available
+        # at the top level of the load configuration.
+        src_fmt_to_param_mapping = {"PARQUET": "parquetOptions"}
+        src_fmt_to_nested_configs_mapping = {
+            "parquetOptions": ["enumAsString", "enableListInference", 
"mapTargetType"],
+        }
+
         valid_configs = src_fmt_to_configs_mapping[self.source_format]
 
+        src_fmt_param = src_fmt_to_param_mapping.get(self.source_format)
+        valid_nested_configs = 
src_fmt_to_nested_configs_mapping[src_fmt_param] if src_fmt_param else None
+
         # if following fields are not specified in src_fmt_configs,
         # honor the top-level params for backward-compatibility
         backward_compatibility_configs = {
@@ -701,7 +716,12 @@ class GCSToBigQueryOperator(BaseOperator):
         }
 
         self.src_fmt_configs = self._validate_src_fmt_configs(
-            self.source_format, self.src_fmt_configs, valid_configs, 
backward_compatibility_configs
+            self.source_format,
+            self.src_fmt_configs,
+            valid_configs,
+            backward_compatibility_configs,
+            src_fmt_param,
+            valid_nested_configs,
         )
 
         self.configuration["load"].update(self.src_fmt_configs)
@@ -716,29 +736,51 @@ class GCSToBigQueryOperator(BaseOperator):
         src_fmt_configs: dict,
         valid_configs: list[str],
         backward_compatibility_configs: dict | None = None,
+        src_fmt_param: str | None = None,
+        valid_nested_configs: list[str] | None = None,
     ) -> dict:
         """
-        Validate the given src_fmt_configs against a valid configuration for 
the source format.
+        Validate and format the given src_fmt_configs against a valid 
configuration for the source format.
 
         Adds the backward compatibility config to the src_fmt_configs.
 
+        Adds nested source format configurations if valid_nested_configs is 
provided.
+
         :param source_format: File format to export.
         :param src_fmt_configs: Configure optional fields specific to the 
source format.
         :param valid_configs: Valid configuration specific to the source format
         :param backward_compatibility_configs: The top-level params for 
backward-compatibility
+        :param src_fmt_param: The source format parameter for nested 
configurations.
+        Required when valid_nested_configs is provided.
+        :param valid_nested_configs: Valid nested configuration specific to 
the source format.
         """
+        valid_src_fmt_configs = {}
+
         if backward_compatibility_configs is None:
             backward_compatibility_configs = {}
 
         for k, v in backward_compatibility_configs.items():
             if k not in src_fmt_configs and k in valid_configs:
-                src_fmt_configs[k] = v
+                valid_src_fmt_configs[k] = v
+
+        if valid_nested_configs is None:
+            valid_nested_configs = []
+
+        if valid_nested_configs:
+            if src_fmt_param is None:
+                raise ValueError("src_fmt_param is required when 
valid_nested_configs is provided.")
 
-        for k in src_fmt_configs:
-            if k not in valid_configs:
+            valid_src_fmt_configs[src_fmt_param] = {}
+
+        for k, v in src_fmt_configs.items():
+            if k in valid_configs:
+                valid_src_fmt_configs[k] = v
+            elif k in valid_nested_configs:
+                valid_src_fmt_configs[src_fmt_param][k] = v
+            else:
                 raise ValueError(f"{k} is not a valid src_fmt_configs for type 
{source_format}.")
 
-        return src_fmt_configs
+        return valid_src_fmt_configs
 
     def _cleanse_time_partitioning(
         self, destination_dataset_table: str | None, time_partitioning_in: 
dict | None
diff --git 
a/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py 
b/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py
index ba83ef96c68..8d059692787 100644
--- a/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py
@@ -1682,6 +1682,212 @@ class TestGCSToBigQueryOperator:
                 },
             )
 
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def test_external_table_should_accept_parquet_format(self, hook):
+        hook.return_value.insert_job.side_effect = [
+            MagicMock(job_id=REAL_JOB_ID, error_result=False),
+            REAL_JOB_ID,
+        ]
+        hook.return_value.generate_job_id.return_value = REAL_JOB_ID
+        hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
+
+        operator = GCSToBigQueryOperator(
+            task_id=TASK_ID,
+            bucket=TEST_BUCKET,
+            source_objects=TEST_SOURCE_OBJECTS,
+            destination_project_dataset_table=TEST_EXPLICIT_DEST,
+            schema_fields=SCHEMA_FIELDS,
+            write_disposition=WRITE_DISPOSITION,
+            external_table=True,
+            project_id=JOB_PROJECT_ID,
+            source_format="PARQUET",
+        )
+
+        operator.execute(context=MagicMock())
+
+        hook.return_value.create_table.assert_called_once_with(
+            exists_ok=True,
+            location=None,
+            project_id=JOB_PROJECT_ID,
+            dataset_id=DATASET,
+            table_id=TABLE,
+            table_resource={
+                "tableReference": {
+                    "projectId": PROJECT_ID,
+                    "datasetId": DATASET,
+                    "tableId": TABLE,
+                },
+                "externalDataConfiguration": {
+                    "autodetect": True,
+                    "sourceFormat": "PARQUET",
+                    "sourceUris": 
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
+                    "compression": "NONE",
+                    "ignoreUnknownValues": False,
+                    "schema": {"fields": SCHEMA_FIELDS},
+                    "parquetOptions": {},
+                },
+                "labels": {},
+            },
+        )
+
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def test_external_table_should_accept_parquet_format_and_options(self, 
hook):
+        hook.return_value.insert_job.side_effect = [
+            MagicMock(job_id=REAL_JOB_ID, error_result=False),
+            REAL_JOB_ID,
+        ]
+        hook.return_value.generate_job_id.return_value = REAL_JOB_ID
+        hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
+
+        operator = GCSToBigQueryOperator(
+            task_id=TASK_ID,
+            bucket=TEST_BUCKET,
+            source_objects=TEST_SOURCE_OBJECTS,
+            destination_project_dataset_table=TEST_EXPLICIT_DEST,
+            schema_fields=SCHEMA_FIELDS,
+            write_disposition=WRITE_DISPOSITION,
+            external_table=True,
+            project_id=JOB_PROJECT_ID,
+            source_format="PARQUET",
+            src_fmt_configs={
+                "enableListInference": True,
+            },
+        )
+
+        operator.execute(context=MagicMock())
+
+        hook.return_value.create_table.assert_called_once_with(
+            exists_ok=True,
+            location=None,
+            project_id=JOB_PROJECT_ID,
+            dataset_id=DATASET,
+            table_id=TABLE,
+            table_resource={
+                "tableReference": {
+                    "projectId": PROJECT_ID,
+                    "datasetId": DATASET,
+                    "tableId": TABLE,
+                },
+                "externalDataConfiguration": {
+                    "autodetect": True,
+                    "sourceFormat": "PARQUET",
+                    "sourceUris": 
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
+                    "compression": "NONE",
+                    "ignoreUnknownValues": False,
+                    "schema": {"fields": SCHEMA_FIELDS},
+                    "parquetOptions": {
+                        "enableListInference": True,
+                    },
+                },
+                "labels": {},
+            },
+        )
+
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def test_without_external_table_should_accept_parquet_format(self, hook):
+        hook.return_value.insert_job.side_effect = [
+            MagicMock(job_id=REAL_JOB_ID, error_result=False),
+            REAL_JOB_ID,
+        ]
+        hook.return_value.generate_job_id.return_value = REAL_JOB_ID
+        hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
+        operator = GCSToBigQueryOperator(
+            task_id=TASK_ID,
+            bucket=TEST_BUCKET,
+            source_objects=TEST_SOURCE_OBJECTS,
+            write_disposition=WRITE_DISPOSITION,
+            destination_project_dataset_table=TEST_EXPLICIT_DEST,
+            external_table=False,
+            project_id=JOB_PROJECT_ID,
+            source_format="PARQUET",
+        )
+
+        operator.execute(context=MagicMock())
+
+        calls = [
+            call(
+                configuration={
+                    "load": {
+                        "autodetect": True,
+                        "createDisposition": "CREATE_IF_NEEDED",
+                        "destinationTable": {
+                            "projectId": "test-project",
+                            "datasetId": "dataset",
+                            "tableId": "table",
+                        },
+                        "sourceFormat": "PARQUET",
+                        "sourceUris": 
["gs://test-bucket/test/objects/test.csv"],
+                        "writeDisposition": "WRITE_TRUNCATE",
+                        "ignoreUnknownValues": False,
+                        "parquetOptions": {},
+                    }
+                },
+                job_id=REAL_JOB_ID,
+                location=None,
+                nowait=True,
+                project_id=JOB_PROJECT_ID,
+                retry=DEFAULT_RETRY,
+                timeout=None,
+            )
+        ]
+
+        hook.return_value.insert_job.assert_has_calls(calls)
+
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def 
test_without_external_table_should_accept_parquet_format_and_options(self, 
hook):
+        hook.return_value.insert_job.side_effect = [
+            MagicMock(job_id=REAL_JOB_ID, error_result=False),
+            REAL_JOB_ID,
+        ]
+        hook.return_value.generate_job_id.return_value = REAL_JOB_ID
+        hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
+        operator = GCSToBigQueryOperator(
+            task_id=TASK_ID,
+            bucket=TEST_BUCKET,
+            source_objects=TEST_SOURCE_OBJECTS,
+            write_disposition=WRITE_DISPOSITION,
+            destination_project_dataset_table=TEST_EXPLICIT_DEST,
+            external_table=False,
+            project_id=JOB_PROJECT_ID,
+            source_format="PARQUET",
+            src_fmt_configs={
+                "enableListInference": True,
+            },
+        )
+
+        operator.execute(context=MagicMock())
+
+        calls = [
+            call(
+                configuration={
+                    "load": {
+                        "autodetect": True,
+                        "createDisposition": "CREATE_IF_NEEDED",
+                        "destinationTable": {
+                            "projectId": "test-project",
+                            "datasetId": "dataset",
+                            "tableId": "table",
+                        },
+                        "sourceFormat": "PARQUET",
+                        "sourceUris": 
["gs://test-bucket/test/objects/test.csv"],
+                        "writeDisposition": "WRITE_TRUNCATE",
+                        "ignoreUnknownValues": False,
+                        "parquetOptions": {
+                            "enableListInference": True,
+                        },
+                    }
+                },
+                job_id=REAL_JOB_ID,
+                location=None,
+                nowait=True,
+                project_id=JOB_PROJECT_ID,
+                retry=DEFAULT_RETRY,
+                timeout=None,
+            )
+        ]
+
+        hook.return_value.insert_job.assert_has_calls(calls)
+
 
 @pytest.fixture
 def create_task_instance(create_task_instance_of_operator, session):

Reply via email to