mlauter commented on code in PR #60876:
URL: https://github.com/apache/airflow/pull/60876#discussion_r2713134066


##########
providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py:
##########
@@ -716,29 +736,51 @@ def _validate_src_fmt_configs(
         src_fmt_configs: dict,
         valid_configs: list[str],
         backward_compatibility_configs: dict | None = None,
+        src_fmt_param: str | None = None,
+        valid_nested_configs: list[str] | None = None,
     ) -> dict:
         """
-        Validate the given src_fmt_configs against a valid configuration for 
the source format.
+        Validate and format the given src_fmt_configs against a valid 
configuration for the source format.
 
         Adds the backward compatibility config to the src_fmt_configs.
 
+        Adds nested source format configurations if valid_nested_configs is 
provided.
+
         :param source_format: File format to export.
         :param src_fmt_configs: Configure optional fields specific to the 
source format.
         :param valid_configs: Valid configuration specific to the source format
         :param backward_compatibility_configs: The top-level params for 
backward-compatibility
+        :param src_fmt_param: The source format parameter for nested 
configurations.
+        Required when valid_nested_configs is provided.
+        :param valid_nested_configs: Valid nested configuration specific to 
the source format.
         """
+        valid_src_fmt_configs = {}
+
         if backward_compatibility_configs is None:
             backward_compatibility_configs = {}
 
         for k, v in backward_compatibility_configs.items():
             if k not in src_fmt_configs and k in valid_configs:
-                src_fmt_configs[k] = v
+                valid_src_fmt_configs[k] = v
+
+        if valid_nested_configs is None:
+            valid_nested_configs = []
+
+        if valid_nested_configs:
+            if src_fmt_param is None:
+                raise ValueError("src_fmt_param is required when 
valid_nested_configs is provided.")
 
-        for k in src_fmt_configs:
-            if k not in valid_configs:
+            valid_src_fmt_configs[src_fmt_param] = {}
+
+        for k, v in src_fmt_configs.items():
+            if k in valid_configs:
+                valid_src_fmt_configs[k] = v
+            elif k in valid_nested_configs:
+                valid_src_fmt_configs[src_fmt_param][k] = v
+            else:

Review Comment:
   I'm not wild about adding complexity here, I think its pretty gross. 
   
   The issue is that both the external table and non external table code paths 
allow `src_fmt_configs` and use this method, but the non external table code 
path previously only allowed arguments that were available at the top level of 
the job config rather than nested `<srcFmt>Options` options.
   
   I'm totally open to suggestions if you think this warrants a larger refactor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to