This is an automated email from the ASF dual-hosted git repository.

shahar1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a3a2b740c18 Support all bq load job and ext table config options in 
GCSToBigQueryOperator (#64505)
a3a2b740c18 is described below

commit a3a2b740c18a825dd7e0f7000976de6f7991aa74
Author: Miriam Lauter <[email protected]>
AuthorDate: Tue May 12 00:43:00 2026 -0400

    Support all bq load job and ext table config options in 
GCSToBigQueryOperator (#64505)
---
 .../google/cloud/transfers/gcs_to_bigquery.py      |  43 +++-
 .../google/cloud/transfers/test_gcs_to_bigquery.py | 259 ++++++++++++++++++---
 2 files changed, 270 insertions(+), 32 deletions(-)

diff --git 
a/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
 
b/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
index ffc88e77e21..cee73e5863b 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
@@ -20,6 +20,7 @@
 from __future__ import annotations
 
 import json
+import warnings
 from collections.abc import Sequence
 from typing import TYPE_CHECKING, Any
 
@@ -31,11 +32,11 @@ from google.cloud.bigquery import (
     ExtractJob,
     LoadJob,
     QueryJob,
-    SchemaField,
     UnknownJob,
 )
 from google.cloud.bigquery.table import EncryptionConfiguration, Table, 
TableReference
 
+from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.providers.common.compat.sdk import AirflowException, conf
 from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook, 
BigQueryJob
 from airflow.providers.google.cloud.hooks.gcs import GCSHook
@@ -136,7 +137,18 @@ class GCSToBigQueryOperator(BaseOperator):
         future executions, you can pick up from the max ID.
     :param schema_update_options: Allows the schema of the destination
         table to be updated as a side effect of the load job.
-    :param src_fmt_configs: configure optional fields specific to the source 
format
+    :param src_fmt_configs: (Deprecated) configure optional fields specific to 
the source format.
+        Use ``extra_config`` instead. Note when migrating that 
``extra_config`` uses the fully-nested API
+        structure, so format-specific options must be nested under their 
parent key
+        (e.g., ``{"parquetOptions": {"enableListInference": True}}`` rather 
than
+        ``{"enableListInference": True}``).
+    :param extra_config: Dict of additional properties to apply over the 
BigQuery job configuration.
+        When ``external_table=False``, applied over the load job configuration
+        (see `JobConfigurationLoad 
<https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad>`_).
+        When ``external_table=True``, applied over the external table 
configuration
+        (see `ExternalDataConfiguration 
<https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ExternalDataConfiguration>`_).
+        Applied after all top-level params, so keys here take precedence over 
overlapping top-level
+        operator params. Nested dicts are replaced entirely, not deep-merged.
     :param external_table: Flag to specify if the destination table should be
         a BigQuery external table. Default Value is False.
     :param time_partitioning: configure optional time partitioning fields i.e.
@@ -189,6 +201,7 @@ class GCSToBigQueryOperator(BaseOperator):
         "destination_project_dataset_table",
         "impersonation_chain",
         "src_fmt_configs",
+        "extra_config",
     )
     template_ext: Sequence[str] = (".sql",)
     ui_color = "#f0eee4"
@@ -219,6 +232,7 @@ class GCSToBigQueryOperator(BaseOperator):
         gcp_conn_id="google_cloud_default",
         schema_update_options=(),
         src_fmt_configs=None,
+        extra_config: dict | None = None,
         external_table=False,
         time_partitioning=None,
         range_partitioning=None,
@@ -289,6 +303,17 @@ class GCSToBigQueryOperator(BaseOperator):
 
         self.schema_update_options = schema_update_options
         self.src_fmt_configs = src_fmt_configs
+        if src_fmt_configs:
+            warnings.warn(
+                "The 'src_fmt_configs' parameter is deprecated. Use 
'extra_config' instead. "
+                "Note: 'extra_config' uses the fully-nested API structure, so 
format-specific "
+                "options must be nested under their parent key "
+                "(e.g., {'parquetOptions': {'enableListInference': True}} 
rather than "
+                "{'enableListInference': True}).",
+                AirflowProviderDeprecationWarning,
+                stacklevel=2,
+            )
+        self.extra_config = extra_config
         self.time_partitioning = time_partitioning
         self.range_partitioning = range_partitioning
         self.cluster_fields = cluster_fields
@@ -570,11 +595,15 @@ class GCSToBigQueryOperator(BaseOperator):
             )
             
external_config_api_repr[src_fmt_to_param_mapping[self.source_format]] = 
self.src_fmt_configs
 
-        external_config = 
ExternalConfig.from_api_repr(external_config_api_repr)
         if self.schema_fields:
-            external_config.schema = [SchemaField.from_api_repr(f) for f in 
self.schema_fields]
+            external_config_api_repr["schema"] = {"fields": self.schema_fields}
         if self.max_bad_records:
-            external_config.max_bad_records = self.max_bad_records
+            external_config_api_repr["maxBadRecords"] = self.max_bad_records
+
+        if self.extra_config:
+            external_config_api_repr.update(self.extra_config)
+
+        external_config = 
ExternalConfig.from_api_repr(external_config_api_repr)
 
         # build table definition
         table = Table(
@@ -728,6 +757,10 @@ class GCSToBigQueryOperator(BaseOperator):
 
         if self.allow_jagged_rows:
             self.configuration["load"]["allowJaggedRows"] = 
self.allow_jagged_rows
+
+        if self.extra_config:
+            self.configuration["load"].update(self.extra_config)
+
         return self.configuration
 
     def _validate_src_fmt_configs(
diff --git 
a/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py 
b/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py
index 8d059692787..be2f5ed52e5 100644
--- a/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py
@@ -27,6 +27,7 @@ from google.cloud.bigquery import DEFAULT_RETRY, Table
 from google.cloud.exceptions import Conflict
 from sqlalchemy import select
 
+from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.models.trigger import Trigger
 from airflow.providers.common.compat.openlineage.facet import (
     ColumnLineageDatasetFacet,
@@ -1739,20 +1740,21 @@ class TestGCSToBigQueryOperator:
         hook.return_value.generate_job_id.return_value = REAL_JOB_ID
         hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
 
-        operator = GCSToBigQueryOperator(
-            task_id=TASK_ID,
-            bucket=TEST_BUCKET,
-            source_objects=TEST_SOURCE_OBJECTS,
-            destination_project_dataset_table=TEST_EXPLICIT_DEST,
-            schema_fields=SCHEMA_FIELDS,
-            write_disposition=WRITE_DISPOSITION,
-            external_table=True,
-            project_id=JOB_PROJECT_ID,
-            source_format="PARQUET",
-            src_fmt_configs={
-                "enableListInference": True,
-            },
-        )
+        with pytest.warns(AirflowProviderDeprecationWarning, 
match="src_fmt_configs"):
+            operator = GCSToBigQueryOperator(
+                task_id=TASK_ID,
+                bucket=TEST_BUCKET,
+                source_objects=TEST_SOURCE_OBJECTS,
+                destination_project_dataset_table=TEST_EXPLICIT_DEST,
+                schema_fields=SCHEMA_FIELDS,
+                write_disposition=WRITE_DISPOSITION,
+                external_table=True,
+                project_id=JOB_PROJECT_ID,
+                source_format="PARQUET",
+                src_fmt_configs={
+                    "enableListInference": True,
+                },
+            )
 
         operator.execute(context=MagicMock())
 
@@ -1841,19 +1843,20 @@ class TestGCSToBigQueryOperator:
         ]
         hook.return_value.generate_job_id.return_value = REAL_JOB_ID
         hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
-        operator = GCSToBigQueryOperator(
-            task_id=TASK_ID,
-            bucket=TEST_BUCKET,
-            source_objects=TEST_SOURCE_OBJECTS,
-            write_disposition=WRITE_DISPOSITION,
-            destination_project_dataset_table=TEST_EXPLICIT_DEST,
-            external_table=False,
-            project_id=JOB_PROJECT_ID,
-            source_format="PARQUET",
-            src_fmt_configs={
-                "enableListInference": True,
-            },
-        )
+        with pytest.warns(AirflowProviderDeprecationWarning, 
match="src_fmt_configs"):
+            operator = GCSToBigQueryOperator(
+                task_id=TASK_ID,
+                bucket=TEST_BUCKET,
+                source_objects=TEST_SOURCE_OBJECTS,
+                write_disposition=WRITE_DISPOSITION,
+                destination_project_dataset_table=TEST_EXPLICIT_DEST,
+                external_table=False,
+                project_id=JOB_PROJECT_ID,
+                source_format="PARQUET",
+                src_fmt_configs={
+                    "enableListInference": True,
+                },
+            )
 
         operator.execute(context=MagicMock())
 
@@ -1888,6 +1891,208 @@ class TestGCSToBigQueryOperator:
 
         hook.return_value.insert_job.assert_has_calls(calls)
 
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def test_extra_config_is_merged_into_load_config(self, hook):
+        hook.return_value.insert_job.return_value = 
MagicMock(job_id=REAL_JOB_ID, error_result=False)
+        hook.return_value.generate_job_id.return_value = REAL_JOB_ID
+        hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
+
+        operator = GCSToBigQueryOperator(
+            task_id=TASK_ID,
+            bucket=TEST_BUCKET,
+            source_objects=TEST_SOURCE_OBJECTS,
+            destination_project_dataset_table=TEST_EXPLICIT_DEST,
+            write_disposition=WRITE_DISPOSITION,
+            project_id=JOB_PROJECT_ID,
+            extra_config={"columnNameCharacterMap": "STRICT"},
+        )
+
+        operator.execute(context=MagicMock())
+
+        config = hook.return_value.insert_job.call_args[1]["configuration"]
+        assert config["load"]["columnNameCharacterMap"] == "STRICT"
+
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def test_extra_config_takes_precedence_over_top_level_params(self, hook):
+        hook.return_value.insert_job.return_value = 
MagicMock(job_id=REAL_JOB_ID, error_result=False)
+        hook.return_value.generate_job_id.return_value = REAL_JOB_ID
+        hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
+
+        operator = GCSToBigQueryOperator(
+            task_id=TASK_ID,
+            bucket=TEST_BUCKET,
+            source_objects=TEST_SOURCE_OBJECTS,
+            destination_project_dataset_table=TEST_EXPLICIT_DEST,
+            write_disposition=WRITE_DISPOSITION,
+            project_id=JOB_PROJECT_ID,
+            autodetect=True,
+            extra_config={"autodetect": False},
+        )
+
+        operator.execute(context=MagicMock())
+
+        config = hook.return_value.insert_job.call_args[1]["configuration"]
+        assert config["load"]["autodetect"] is False
+
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def test_extra_config_with_nested_format_options(self, hook):
+        hook.return_value.insert_job.return_value = 
MagicMock(job_id=REAL_JOB_ID, error_result=False)
+        hook.return_value.generate_job_id.return_value = REAL_JOB_ID
+        hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
+
+        operator = GCSToBigQueryOperator(
+            task_id=TASK_ID,
+            bucket=TEST_BUCKET,
+            source_objects=TEST_SOURCE_OBJECTS,
+            destination_project_dataset_table=TEST_EXPLICIT_DEST,
+            write_disposition=WRITE_DISPOSITION,
+            project_id=JOB_PROJECT_ID,
+            source_format="PARQUET",
+            extra_config={"parquetOptions": {"enableListInference": True}},
+        )
+
+        operator.execute(context=MagicMock())
+
+        config = hook.return_value.insert_job.call_args[1]["configuration"]
+        assert config["load"]["parquetOptions"] == {"enableListInference": 
True}
+
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def test_extra_config_is_merged_into_external_config(self, hook):
+        hook.return_value.generate_job_id.return_value = REAL_JOB_ID
+        hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
+
+        operator = GCSToBigQueryOperator(
+            task_id=TASK_ID,
+            bucket=TEST_BUCKET,
+            source_objects=TEST_SOURCE_OBJECTS,
+            destination_project_dataset_table=TEST_EXPLICIT_DEST,
+            schema_fields=SCHEMA_FIELDS,
+            write_disposition=WRITE_DISPOSITION,
+            external_table=True,
+            project_id=JOB_PROJECT_ID,
+            extra_config={"maxBadRecords": 10},
+        )
+
+        operator.execute(context=MagicMock())
+
+        table_resource = 
hook.return_value.create_table.call_args[1]["table_resource"]
+        assert table_resource["externalDataConfiguration"]["maxBadRecords"] == 
10
+
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def test_max_bad_records_set_from_top_level_param_in_external_table(self, 
hook):
+        hook.return_value.generate_job_id.return_value = REAL_JOB_ID
+        hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
+
+        operator = GCSToBigQueryOperator(
+            task_id=TASK_ID,
+            bucket=TEST_BUCKET,
+            source_objects=TEST_SOURCE_OBJECTS,
+            destination_project_dataset_table=TEST_EXPLICIT_DEST,
+            schema_fields=SCHEMA_FIELDS,
+            write_disposition=WRITE_DISPOSITION,
+            external_table=True,
+            project_id=JOB_PROJECT_ID,
+            max_bad_records=5,
+        )
+
+        operator.execute(context=MagicMock())
+
+        table_resource = 
hook.return_value.create_table.call_args[1]["table_resource"]
+        assert table_resource["externalDataConfiguration"]["maxBadRecords"] == 
5
+
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def 
test_extra_config_takes_precedence_over_max_bad_records_in_external_table(self, 
hook):
+        hook.return_value.generate_job_id.return_value = REAL_JOB_ID
+        hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
+
+        operator = GCSToBigQueryOperator(
+            task_id=TASK_ID,
+            bucket=TEST_BUCKET,
+            source_objects=TEST_SOURCE_OBJECTS,
+            destination_project_dataset_table=TEST_EXPLICIT_DEST,
+            schema_fields=SCHEMA_FIELDS,
+            write_disposition=WRITE_DISPOSITION,
+            external_table=True,
+            project_id=JOB_PROJECT_ID,
+            max_bad_records=5,
+            extra_config={"maxBadRecords": 10},
+        )
+
+        operator.execute(context=MagicMock())
+
+        table_resource = 
hook.return_value.create_table.call_args[1]["table_resource"]
+        assert table_resource["externalDataConfiguration"]["maxBadRecords"] == 
10
+
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def 
test_extra_config_takes_precedence_over_schema_fields_in_external_table(self, 
hook):
+        hook.return_value.generate_job_id.return_value = REAL_JOB_ID
+        hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
+
+        override_schema = [{"name": "override_col", "type": "INTEGER", "mode": 
"NULLABLE"}]
+        operator = GCSToBigQueryOperator(
+            task_id=TASK_ID,
+            bucket=TEST_BUCKET,
+            source_objects=TEST_SOURCE_OBJECTS,
+            destination_project_dataset_table=TEST_EXPLICIT_DEST,
+            schema_fields=SCHEMA_FIELDS,
+            write_disposition=WRITE_DISPOSITION,
+            external_table=True,
+            project_id=JOB_PROJECT_ID,
+            extra_config={"schema": {"fields": override_schema}},
+        )
+
+        operator.execute(context=MagicMock())
+
+        table_resource = 
hook.return_value.create_table.call_args[1]["table_resource"]
+        assert table_resource["externalDataConfiguration"]["schema"]["fields"] 
== override_schema
+
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def test_src_fmt_configs_emits_deprecation_warning(self, hook):
+        hook.return_value.insert_job.return_value = 
MagicMock(job_id=REAL_JOB_ID, error_result=False)
+        hook.return_value.generate_job_id.return_value = REAL_JOB_ID
+        hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
+
+        with pytest.warns(AirflowProviderDeprecationWarning, 
match="src_fmt_configs"):
+            operator = GCSToBigQueryOperator(
+                task_id=TASK_ID,
+                bucket=TEST_BUCKET,
+                source_objects=TEST_SOURCE_OBJECTS,
+                destination_project_dataset_table=TEST_EXPLICIT_DEST,
+                write_disposition=WRITE_DISPOSITION,
+                project_id=JOB_PROJECT_ID,
+                src_fmt_configs={"skipLeadingRows": 1},
+            )
+
+        operator.execute(context=MagicMock())
+
+        config = hook.return_value.insert_job.call_args[1]["configuration"]
+        assert config["load"]["skipLeadingRows"] == 1
+
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def 
test_src_fmt_configs_and_extra_config_both_applied_with_precedence(self, hook):
+        hook.return_value.insert_job.return_value = 
MagicMock(job_id=REAL_JOB_ID, error_result=False)
+        hook.return_value.generate_job_id.return_value = REAL_JOB_ID
+        hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
+
+        with pytest.warns(AirflowProviderDeprecationWarning, 
match="src_fmt_configs"):
+            operator = GCSToBigQueryOperator(
+                task_id=TASK_ID,
+                bucket=TEST_BUCKET,
+                source_objects=TEST_SOURCE_OBJECTS,
+                destination_project_dataset_table=TEST_EXPLICIT_DEST,
+                write_disposition=WRITE_DISPOSITION,
+                project_id=JOB_PROJECT_ID,
+                src_fmt_configs={"skipLeadingRows": 1},
+                extra_config={"skipLeadingRows": 5, "columnNameCharacterMap": 
"STRICT"},
+            )
+
+        operator.execute(context=MagicMock())
+
+        config = hook.return_value.insert_job.call_args[1]["configuration"]
+        # extra_config wins for overlapping key
+        assert config["load"]["skipLeadingRows"] == 5
+        assert config["load"]["columnNameCharacterMap"] == "STRICT"
+
 
 @pytest.fixture
 def create_task_instance(create_task_instance_of_operator, session):

Reply via email to