tswast commented on a change in pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#discussion_r620364813



##########
File path: airflow/providers/google/cloud/hooks/bigquery.py
##########
@@ -1353,6 +1353,86 @@ def get_schema(self, dataset_id: str, table_id: str, 
project_id: Optional[str] =
         table = self.get_client(project_id=project_id).get_table(table_ref)
         return {"fields": [s.to_api_repr() for s in table.schema]}
 
+    @GoogleBaseHook.fallback_to_default_project_id
+    def update_table_schema(
+        self,
+        schema_fields_updates: List[Dict[str, Any]],
+        dataset_id: str,
+        table_id: str,
+        project_id: Optional[str] = None,
+    ) -> None:
+        """
+        Update fields within a schema for a given dataset and table. Note that
+        some fields in schemas are immutable and trying to change them will 
cause
+        an exception.
+        If a new field is included it will be inserted which requires all 
required fields to be set.
+        See 
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+        :param dataset_id: the dataset ID of the requested table to be updated
+        :type dataset_id: str
+        :param table_id: the table ID of the table to be updated
+        :type table_id: str
+        :param schema_fields_updates: a partial schema resource. see
+            
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+        **Example**: ::
+
+            schema_fields_updates=[
+                {"name": "emp_name", "description": "Some New Description"},
+                {"name": "salary", "description": "Some New Description"},
+                {"name": "departments", "fields": [
+                    {"name": "name", "description": "Some New Description"},
+                    {"name": "type", "description": "Some New Description"}
+                ]},
+            ]
+
+        :type schema_fields_updates: List[dict]
+        :param project_id: The name of the project where we want to update the 
table.
+        :type project_id: str
+        """
+
+        def _build_new_schema(
+            current_schema: List[Dict[str, Any]], schema_fields_updates: 
List[Dict[str, Any]]
+        ) -> List[Dict[str, Any]]:
+
+            # Turn schema_field_updates into a dict keyed on field names
+            schema_fields_updates = {field["name"]: field for field in 
deepcopy(schema_fields_updates)}
+
+            # Create a new dict for storing the new schema, initated based on 
the current_schema
+            # as of Python 3.6, dicts retain order.
+            new_schema = {field["name"]: field for field in 
deepcopy(current_schema)}
+
+            # Each item in schema_fields_updates contains a potential patch
+            # to a schema field, iterate over them
+            for field_name, patched_value in schema_fields_updates.items():
+                # If this field already exists, update it
+                if field_name in new_schema:
+                    # If this field is of type RECORD and has a fields key we 
need to patch it recursively
+                    if "fields" in patched_value:
+                        patched_value["fields"] = _build_new_schema(
+                            new_schema[field_name]["fields"], 
patched_value["fields"]
+                        )
+                    # Update the new_schema with the patched value
+                    new_schema[field_name].update(patched_value)
+                # This is a new field, just include the whole configuration 
for it
+                else:
+                    new_schema[field_name] = patched_value
+
+            return list(new_schema.values())
+
+        current_table_schema = self.get_schema(

Review comment:
       > Well, if the current table has policyTags and you filter them out from 
the schema included in the update, it is my understanding that you will in 
effect delete them
   
   This is false.
   
   The way it works is that any keys that are present in the JSON are the ones 
that are updated. To remove policy tags, you'd have to include the `policyTags` 
key and set them to an empty list (or maybe None).
   
   Including these keys, even if they are exactly the same as what is present 
on the table will attempt to update them, which requires additional permissions 
beyond those needed for general schema updates. (Thus the 403 error I linked to 
and the resulting fix)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to